mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-03 07:46:50 +08:00
Compare commits
37 Commits
copilot/ad
...
feature/on
Author | SHA1 | Date | |
---|---|---|---|
![]() |
5c3c1b68a6 | ||
![]() |
edf1ca07af | ||
![]() |
33abfddd9b | ||
![]() |
bbd50c6717 | ||
![]() |
132a8ef425 | ||
![]() |
e11331927f | ||
![]() |
81092c0fe3 | ||
![]() |
ad816f20f4 | ||
![]() |
37b76158f9 | ||
![]() |
fe2094609f | ||
![]() |
b4bb54b56b | ||
![]() |
eeec4bd15e | ||
![]() |
d2592750f7 | ||
![]() |
25f51b0611 | ||
![]() |
9b07f85f6d | ||
![]() |
2fe31c6f0f | ||
![]() |
a33e557732 | ||
![]() |
054c790642 | ||
![]() |
ca4e4ab911 | ||
![]() |
c000cff744 | ||
![]() |
86ff68be4b | ||
![]() |
702c313ed1 | ||
![]() |
6706ccb37e | ||
![]() |
1b6f482c15 | ||
![]() |
5d3bf308f6 | ||
![]() |
f672a34f95 | ||
![]() |
bc0b92bba4 | ||
![]() |
3dd8492601 | ||
![]() |
bd77a3a643 | ||
![]() |
9561603ed9 | ||
![]() |
e26313a355 | ||
![]() |
4367c09a5f | ||
![]() |
8e789dcb67 | ||
![]() |
5f6fc7f7b9 | ||
![]() |
d4059cabf0 | ||
![]() |
c8dd5976ae | ||
![]() |
4880c16be3 |
6
.github/workflows/_build_linux.yml
vendored
6
.github/workflows/_build_linux.yml
vendored
@@ -124,14 +124,12 @@ jobs:
|
||||
echo "Date Only: $DATE_ONLY"
|
||||
export FASTDEPLOY_VERSION="${FASTDEPLOY_VERSION}.dev${DATE_ONLY}"
|
||||
fi
|
||||
pip config set global.index-url http://pip.baidu.com/root/baidu/+simple/
|
||||
pip config set install.trusted-host pip.baidu.com
|
||||
pip config set global.extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install -r requirements.txt
|
||||
python -m pip install wheel
|
||||
python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
# 编译RDMA
|
||||
export ENABLE_FD_RDMA=1
|
||||
bash build.sh 1 python false [${COMPILE_ARCH}]
|
||||
|
55
.github/workflows/_logprob_test_linux.yml
vendored
55
.github/workflows/_logprob_test_linux.yml
vendored
@@ -62,18 +62,22 @@ jobs:
|
||||
MODEL_CACHE_DIR: ${{ inputs.MODEL_CACHE_DIR }}
|
||||
run: |
|
||||
runner_name="${{ runner.name }}"
|
||||
last_char="${runner_name: -1}"
|
||||
CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}')
|
||||
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
|
||||
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
|
||||
|
||||
if [[ "$last_char" =~ [0-7] ]]; then
|
||||
DEVICES="$last_char"
|
||||
else
|
||||
DEVICES="0"
|
||||
fi
|
||||
|
||||
FLASK_PORT=$((9160 + DEVICES * 100))
|
||||
FD_API_PORT=$((9180 + DEVICES * 100))
|
||||
FD_ENGINE_QUEUE_PORT=$((9150 + DEVICES * 100))
|
||||
FD_METRICS_PORT=$((9170 + DEVICES * 100))
|
||||
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
|
||||
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
|
||||
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
|
||||
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
|
||||
echo "Test ENV Parameter:"
|
||||
echo "========================================================="
|
||||
echo "FLASK_PORT=${FLASK_PORT}"
|
||||
echo "FD_API_PORT=${FD_API_PORT}"
|
||||
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
|
||||
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
|
||||
echo "DEVICES=${DEVICES}"
|
||||
echo "========================================================="
|
||||
|
||||
CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}"
|
||||
echo "CACHE_DIR is set to ${CACHE_DIR}"
|
||||
@@ -85,7 +89,24 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
|
||||
PARENT_DIR=$(dirname "$WORKSPACE")
|
||||
PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT)
|
||||
LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log"
|
||||
echo "==== LOG_FILE is ${LOG_FILE} ===="
|
||||
|
||||
echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE
|
||||
|
||||
for port in "${PORTS[@]}"; do
|
||||
PIDS=$(lsof -t -i :$port || true)
|
||||
if [ -n "$PIDS" ]; then
|
||||
echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE
|
||||
echo "$PIDS" | xargs -r kill -9
|
||||
echo "Port $port cleared" | tee -a $LOG_FILE
|
||||
else
|
||||
echo "Port $port is free" | tee -a $LOG_FILE
|
||||
fi
|
||||
done
|
||||
|
||||
echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE
|
||||
|
||||
docker run --ipc=host --pid=host --net=host \
|
||||
-v $(pwd):/workspace \
|
||||
@@ -100,13 +121,11 @@ jobs:
|
||||
-v "${CACHE_DIR}/.cache:/root/.cache" \
|
||||
-v "${CACHE_DIR}/ConfigDir:/root/.config" \
|
||||
-e TZ="Asia/Shanghai" \
|
||||
--gpus '"device='"${DEVICES}"'"' ${docker_image} /bin/bash -c '
|
||||
# python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
python -m pip install paddlepaddle-gpu==3.0.0.dev20250729 -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
--gpus '"device='"${DEVICES}"'"' ${docker_image} /bin/bash -xc '
|
||||
python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
|
||||
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
|
||||
pip config set global.index-url http://pip.baidu.com/root/baidu/+simple/
|
||||
pip config set install.trusted-host pip.baidu.com
|
||||
pip config set global.extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
python -m pip install ${fastdeploy_wheel_url}
|
||||
|
||||
wget https://paddle-qa.bj.bcebos.com/zhengtianyu/tools/llm-deploy-linux-amd64
|
||||
|
138
.github/workflows/_pre_ce_test.yml
vendored
Normal file
138
.github/workflows/_pre_ce_test.yml
vendored
Normal file
@@ -0,0 +1,138 @@
|
||||
name: Pre-CE-Test
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
DOCKER_IMAGE:
|
||||
description: "Build Images"
|
||||
required: true
|
||||
type: string
|
||||
default: "ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddle:fastdeploy-ciuse-cuda126"
|
||||
FASTDEPLOY_ARCHIVE_URL:
|
||||
description: "URL of the compressed FastDeploy code archive."
|
||||
required: true
|
||||
type: string
|
||||
FASTDEPLOY_WHEEL_URL:
|
||||
description: "URL of the FastDeploy Wheel."
|
||||
required: true
|
||||
type: string
|
||||
CACHE_DIR:
|
||||
description: "Cache Dir Use"
|
||||
required: false
|
||||
type: string
|
||||
default: ""
|
||||
MODEL_CACHE_DIR:
|
||||
description: "Cache Dir Use"
|
||||
required: false
|
||||
type: string
|
||||
default: ""
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.event.pull_request.number }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
run_ce_cases:
|
||||
runs-on: [self-hosted, PRE_CE_RUN_2Card]
|
||||
steps:
|
||||
- name: Print current runner name
|
||||
run: |
|
||||
echo "Current runner name: ${{ runner.name }}"
|
||||
- name: Code Prepare
|
||||
shell: bash
|
||||
env:
|
||||
docker_image: ${{ inputs.DOCKER_IMAGE }}
|
||||
fd_archive_url: ${{ inputs.FASTDEPLOY_ARCHIVE_URL }}
|
||||
run: |
|
||||
set -x
|
||||
REPO="https://github.com/${{ github.repository }}.git"
|
||||
FULL_REPO="${{ github.repository }}"
|
||||
REPO_NAME="${FULL_REPO##*/}"
|
||||
BASE_BRANCH="${{ github.base_ref }}"
|
||||
|
||||
# Clean the repository directory before starting
|
||||
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
|
||||
-e "REPO_NAME=${REPO_NAME}" \
|
||||
${docker_image} /bin/bash -c '
|
||||
if [ -d ${REPO_NAME} ]; then
|
||||
echo "Directory ${REPO_NAME} exists, removing it..."
|
||||
rm -rf ${REPO_NAME}*
|
||||
fi
|
||||
'
|
||||
|
||||
wget -q ${fd_archive_url}
|
||||
tar -xf FastDeploy.tar.gz
|
||||
rm -rf FastDeploy.tar.gz
|
||||
cd FastDeploy
|
||||
git config --global user.name "FastDeployCI"
|
||||
git config --global user.email "fastdeploy_ci@example.com"
|
||||
git log -n 3 --oneline
|
||||
|
||||
- name: Run CI unittest
|
||||
env:
|
||||
docker_image: ${{ inputs.DOCKER_IMAGE }}
|
||||
fd_wheel_url: ${{ inputs.FASTDEPLOY_WHEEL_URL }}
|
||||
CACHE_DIR: ${{ inputs.CACHE_DIR }}
|
||||
MODEL_CACHE_DIR: ${{ inputs.MODEL_CACHE_DIR }}
|
||||
run: |
|
||||
runner_name="${{ runner.name }}"
|
||||
CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}')
|
||||
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
|
||||
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
|
||||
|
||||
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
|
||||
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
|
||||
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
|
||||
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
|
||||
echo "Test ENV Parameter:"
|
||||
echo "========================================================="
|
||||
echo "FLASK_PORT=${FLASK_PORT}"
|
||||
echo "FD_API_PORT=${FD_API_PORT}"
|
||||
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
|
||||
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
|
||||
echo "DEVICES=${DEVICES}"
|
||||
echo "========================================================="
|
||||
|
||||
CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}"
|
||||
echo "CACHE_DIR is set to ${CACHE_DIR}"
|
||||
if [ ! -f "${CACHE_DIR}/gitconfig" ]; then
|
||||
touch "${CACHE_DIR}/gitconfig"
|
||||
fi
|
||||
|
||||
PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT)
|
||||
LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log"
|
||||
echo "==== LOG_FILE is ${LOG_FILE} ===="
|
||||
|
||||
echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE
|
||||
|
||||
for port in "${PORTS[@]}"; do
|
||||
PIDS=$(lsof -t -i :$port || true)
|
||||
if [ -n "$PIDS" ]; then
|
||||
echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE
|
||||
echo "$PIDS" | xargs -r kill -9
|
||||
echo "Port $port cleared" | tee -a $LOG_FILE
|
||||
else
|
||||
echo "Port $port is free" | tee -a $LOG_FILE
|
||||
fi
|
||||
done
|
||||
|
||||
echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE
|
||||
|
||||
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
|
||||
-v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \
|
||||
-v "${CACHE_DIR}/.cache:/root/.cache" \
|
||||
-v "${CACHE_DIR}/ConfigDir:/root/.config" \
|
||||
-v "${MODEL_CACHE_DIR}:/ModelData:ro" \
|
||||
-e "MODEL_PATH=/ModelData" \
|
||||
-e "FD_API_PORT=${FD_API_PORT}" \
|
||||
-e "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" \
|
||||
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
|
||||
-e "FLASK_PORT=${FLASK_PORT}" \
|
||||
-e "fd_wheel_url=${fd_wheel_url}" \
|
||||
--gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c '
|
||||
git config --global --add safe.directory /workspace/FastDeploy
|
||||
cd FastDeploy
|
||||
python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
python -m pip install ${fd_wheel_url}
|
||||
bash scripts/run_pre_ce.sh
|
||||
'
|
207
.github/workflows/_unit_test_coverage.yml
vendored
207
.github/workflows/_unit_test_coverage.yml
vendored
@@ -22,13 +22,19 @@ on:
|
||||
required: false
|
||||
type: string
|
||||
default: ""
|
||||
MODEL_CACHE_DIR:
|
||||
description: "Cache Dir Use"
|
||||
required: false
|
||||
type: string
|
||||
default: ""
|
||||
|
||||
jobs:
|
||||
run_tests_with_coverage:
|
||||
runs-on: [self-hosted, GPU-h1z1-4Cards]
|
||||
runs-on: [self-hosted, GPU-h1z1-2Cards]
|
||||
outputs:
|
||||
diff_cov_file_url: ${{ steps.cov_upload.outputs.diff_cov_file_url }}
|
||||
unittest_failed_url: ${{ steps.unittest_failed.outputs.unittest_failed_url }}
|
||||
unittest_failed_url: ${{ steps.cov_upload.outputs.unittest_failed_url }}
|
||||
diff_cov_result_json_url: ${{ steps.cov_upload.outputs.diff_cov_result_json_url }}
|
||||
steps:
|
||||
- name: Code Prepare
|
||||
shell: bash
|
||||
@@ -66,58 +72,102 @@ jobs:
|
||||
fd_wheel_url: ${{ inputs.FASTDEPLOY_WHEEL_URL }}
|
||||
CACHE_DIR: ${{ inputs.CACHE_DIR }}
|
||||
BASE_REF: ${{ github.event.pull_request.base.ref }}
|
||||
MODEL_CACHE_DIR: ${{ inputs.MODEL_CACHE_DIR }}
|
||||
run: |
|
||||
set -x
|
||||
runner_name="${{ runner.name }}"
|
||||
CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}')
|
||||
gpu_id=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
|
||||
set -x
|
||||
runner_name="${{ runner.name }}"
|
||||
CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}')
|
||||
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
|
||||
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
|
||||
|
||||
CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}"
|
||||
echo "CACHE_DIR is set to ${CACHE_DIR}"
|
||||
if [ ! -f "${CACHE_DIR}/gitconfig" ]; then
|
||||
touch "${CACHE_DIR}/gitconfig"
|
||||
fi
|
||||
PARENT_DIR=$(dirname "$WORKSPACE")
|
||||
echo "PARENT_DIR:$PARENT_DIR"
|
||||
docker run --rm --net=host \
|
||||
--cap-add=SYS_PTRACE --privileged --shm-size=64G \
|
||||
-v $(pwd):/workspace -w /workspace \
|
||||
-v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \
|
||||
-v "${CACHE_DIR}/.cache:/root/.cache" \
|
||||
-v "${CACHE_DIR}/ConfigDir:/root/.config" \
|
||||
-e TZ="Asia/Shanghai" \
|
||||
-e "fd_wheel_url=${fd_wheel_url}" \
|
||||
-e "BASE_REF=${BASE_REF}" \
|
||||
--gpus "\"device=${gpu_id}\"" ${docker_image} /bin/bash -c '
|
||||
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
|
||||
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
|
||||
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
|
||||
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
|
||||
echo "Test ENV Parameter:"
|
||||
echo "========================================================="
|
||||
echo "FLASK_PORT=${FLASK_PORT}"
|
||||
echo "FD_API_PORT=${FD_API_PORT}"
|
||||
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
|
||||
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
|
||||
echo "DEVICES=${DEVICES}"
|
||||
echo "========================================================="
|
||||
|
||||
git config --global --add safe.directory /workspace/FastDeploy
|
||||
cd FastDeploy
|
||||
# python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
python -m pip install paddlepaddle-gpu==3.0.0.dev20250729 -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}"
|
||||
echo "CACHE_DIR is set to ${CACHE_DIR}"
|
||||
if [ ! -f "${CACHE_DIR}/gitconfig" ]; then
|
||||
touch "${CACHE_DIR}/gitconfig"
|
||||
fi
|
||||
|
||||
pip config set global.index-url http://pip.baidu.com/root/baidu/+simple/
|
||||
pip config set install.trusted-host pip.baidu.com
|
||||
pip config set global.extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT)
|
||||
LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log"
|
||||
echo "==== LOG_FILE is ${LOG_FILE} ===="
|
||||
|
||||
echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE
|
||||
|
||||
for port in "${PORTS[@]}"; do
|
||||
PIDS=$(lsof -t -i :$port || true)
|
||||
if [ -n "$PIDS" ]; then
|
||||
echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE
|
||||
echo "$PIDS" | xargs -r kill -9
|
||||
echo "Port $port cleared" | tee -a $LOG_FILE
|
||||
else
|
||||
echo "Port $port is free" | tee -a $LOG_FILE
|
||||
fi
|
||||
done
|
||||
|
||||
echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE
|
||||
|
||||
docker run --rm --net=host \
|
||||
--cap-add=SYS_PTRACE --shm-size=64G \
|
||||
-v $(pwd):/workspace -w /workspace \
|
||||
-v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \
|
||||
-v "${CACHE_DIR}/.cache:/root/.cache" \
|
||||
-v "${CACHE_DIR}/ConfigDir:/root/.config" \
|
||||
-v "${MODEL_CACHE_DIR}:/ModelData:ro" \
|
||||
-e "MODEL_PATH=/ModelData" \
|
||||
-e "FD_API_PORT=${FD_API_PORT}" \
|
||||
-e "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" \
|
||||
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
|
||||
-e "FLASK_PORT=${FLASK_PORT}" \
|
||||
-e TZ="Asia/Shanghai" \
|
||||
-e "fd_wheel_url=${fd_wheel_url}" \
|
||||
-e "BASE_REF=${BASE_REF}" \
|
||||
--gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c '
|
||||
|
||||
git config --global --add safe.directory /workspace/FastDeploy
|
||||
cd FastDeploy
|
||||
python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
|
||||
pip config set global.extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
|
||||
python -m pip install coverage
|
||||
python -m pip install diff-cover
|
||||
python -m pip install ${fd_wheel_url}
|
||||
if [ -d "test/plugins" ]; then
|
||||
cd test/plugins
|
||||
python setup.py install
|
||||
cd ../..
|
||||
else
|
||||
echo "Warning: test/plugins directory not found, skipping setup.py install"
|
||||
fi
|
||||
export COVERAGE_FILE=/workspace/FastDeploy/coveragedata/.coverage
|
||||
export COVERAGE_RCFILE=/workspace/FastDeploy/scripts/.coveragerc
|
||||
TEST_EXIT_CODE=0
|
||||
bash scripts/coverage_run.sh || TEST_EXIT_CODE=8
|
||||
git diff origin/${BASE_REF}..HEAD --unified=0 > diff.txt
|
||||
echo "TEST_EXIT_CODE=${TEST_EXIT_CODE}" >> exit_code.env
|
||||
coverage combine coveragedata/
|
||||
coverage xml -o python_coverage_all.xml
|
||||
COVERAGE_EXIT_CODE=0
|
||||
diff-cover python_coverage_all.xml --diff-file=diff.txt --fail-under=80 --json-report diff_coverage.json || COVERAGE_EXIT_CODE=9
|
||||
echo "COVERAGE_EXIT_CODE=${COVERAGE_EXIT_CODE}" >> exit_code.env
|
||||
python scripts/generate_diff_coverage_xml.py diff.txt python_coverage_all.xml
|
||||
'
|
||||
if [ -f FastDeploy/exit_code.env ]; then
|
||||
cat FastDeploy/exit_code.env >> $GITHUB_ENV
|
||||
fi
|
||||
|
||||
python -m pip install coverage
|
||||
python -m pip install diff-cover
|
||||
python -m pip install ${fd_wheel_url}
|
||||
export COVERAGE_FILE=/workspace/FastDeploy/coveragedata/.coverage
|
||||
export COVERAGE_RCFILE=/workspace/FastDeploy/scripts/.coveragerc
|
||||
TEST_EXIT_CODE=0
|
||||
bash scripts/coverage_run.sh || TEST_EXIT_CODE=8
|
||||
git diff origin/${BASE_REF}..HEAD --unified=0 > diff.txt
|
||||
echo "TEST_EXIT_CODE=${TEST_EXIT_CODE}" >> exit_code.env
|
||||
coverage combine coveragedata/
|
||||
coverage xml -o python_coverage_all.xml
|
||||
COVERAGE_EXIT_CODE=0
|
||||
diff-cover python_coverage_all.xml --diff-file=diff.txt --fail-under=90 || COVERAGE_EXIT_CODE=9
|
||||
echo "COVERAGE_EXIT_CODE=${COVERAGE_EXIT_CODE}" >> exit_code.env
|
||||
python scripts/generate_diff_coverage_xml.py diff.txt python_coverage_all.xml
|
||||
'
|
||||
if [ -f FastDeploy/exit_code.env ]; then
|
||||
cat FastDeploy/exit_code.env >> $GITHUB_ENV
|
||||
fi
|
||||
- name: Upload unit resule and diff coverage to bos
|
||||
id: cov_upload
|
||||
shell: bash
|
||||
@@ -125,30 +175,79 @@ jobs:
|
||||
cd FastDeploy
|
||||
commit_id=${{ github.event.pull_request.head.sha }}
|
||||
pr_num=${{ github.event.pull_request.number }}
|
||||
target_path=paddle-github-action/PR/FastDeploy/${pr_num}/${commit_id}/SM${compile_arch//,/_}/CoverageData
|
||||
target_path=paddle-github-action/PR/FastDeploy/${pr_num}/${commit_id}/SM${compile_arch//,/_}
|
||||
wget -q --no-proxy --no-check-certificate https://paddle-qa.bj.bcebos.com/CodeSync/develop/PaddlePaddle/PaddleTest/tools/bos_tools.py
|
||||
push_file=$(realpath bos_tools.py)
|
||||
python -m pip install bce-python-sdk==0.9.29
|
||||
diff_cov_file="diff_coverage.xml"
|
||||
if [ -f ${diff_cov_file} ];then
|
||||
python ${push_file} ${diff_cov_file} ${target_path}
|
||||
python ${push_file} ${diff_cov_file} ${target_path}/CoverageData
|
||||
target_path_stripped="${target_path#paddle-github-action/}"
|
||||
DIFF_COV_FILE_URL=https://paddle-github-action.bj.bcebos.com/${target_path_stripped}/${diff_cov_file}
|
||||
DIFF_COV_FILE_URL=https://paddle-github-action.bj.bcebos.com/${target_path_stripped}/CoverageData/${diff_cov_file}
|
||||
echo "diff_cov_file_url=${DIFF_COV_FILE_URL}" >> $GITHUB_OUTPUT
|
||||
echo "diff_cov_file_url=${DIFF_COV_FILE_URL}" >> $GITHUB_ENV
|
||||
fi
|
||||
- name: Determine Unit Succ and whether the coverage rate reaches 90%
|
||||
diff_cov_result_json="diff_coverage.json"
|
||||
if [ -f ${diff_cov_result_json} ];then
|
||||
python ${push_file} ${diff_cov_result_json} ${target_path}/CoverageData
|
||||
target_path_stripped="${target_path#paddle-github-action/}"
|
||||
DIFF_COV_JSON_URL=https://paddle-github-action.bj.bcebos.com/${target_path_stripped}/CoverageData/${diff_cov_result_json}
|
||||
echo "diff_cov_result_json_url=${DIFF_COV_JSON_URL}" >> $GITHUB_OUTPUT
|
||||
echo "diff_cov_result_json_url=${DIFF_COV_JSON_URL}" >> $GITHUB_ENV
|
||||
fi
|
||||
unittest_result="test/failed_tests.log"
|
||||
if [ -s ${unittest_result} ];then
|
||||
python ${push_file} ${unittest_result} ${target_path}/UnitTestResult
|
||||
target_path_stripped="${target_path#paddle-github-action/}"
|
||||
UNIT_TEST_RESULT_URL=https://paddle-github-action.bj.bcebos.com/${target_path_stripped}/UnitTestResult/${unittest_result}
|
||||
echo "unittest_failed_url=${UNIT_TEST_RESULT_URL}" >> $GITHUB_OUTPUT
|
||||
echo "unittest_failed_url=${UNIT_TEST_RESULT_URL}" >> $GITHUB_ENV
|
||||
fi
|
||||
- name: Check Unit Test Success
|
||||
shell: bash
|
||||
run: |
|
||||
cd FastDeploy
|
||||
if [ "$TEST_EXIT_CODE" -eq 8 ]; then
|
||||
filename=$(basename "$unittest_failed_url")
|
||||
if [ -z "${unittest_failed_url}" ]; then
|
||||
echo "No diff unit failed file URL provided."
|
||||
else
|
||||
rm -rf "${filename}"
|
||||
wget -O ${filename} ${unittest_failed_url} || echo "Download unittest file failed, but continuing..."
|
||||
fi
|
||||
echo "Unit tests failed (exit code 8)"
|
||||
if [ -f "${filename}" ];then
|
||||
echo "Failed test cases:"
|
||||
cat "${filename}"
|
||||
fi
|
||||
exit "$TEST_EXIT_CODE"
|
||||
fi
|
||||
echo "All tests passed"
|
||||
|
||||
- name: Verify Code Coverage Threshold (80%)
|
||||
shell: bash
|
||||
run: |
|
||||
cd FastDeploy
|
||||
if [ "$COVERAGE_EXIT_CODE" -eq 9 ]; then
|
||||
echo "Coverage generation failed (exit code 9)"
|
||||
filename=$(basename "$diff_cov_result_json_url")
|
||||
if [ -z "${diff_cov_result_json_url}" ]; then
|
||||
echo "No diff cov result file URL provided."
|
||||
else
|
||||
rm -rf "${filename}"
|
||||
wget -O ${filename} ${diff_cov_result_json_url} || echo "Download cov json file failed, but continuing..."
|
||||
fi
|
||||
if [ -f "${filename}" ];then
|
||||
echo "Failed test cases:"
|
||||
if command -v jq >/dev/null 2>&1; then
|
||||
jq . "${filename}"
|
||||
else
|
||||
cat "${filename}"
|
||||
fi
|
||||
fi
|
||||
exit "$COVERAGE_EXIT_CODE"
|
||||
fi
|
||||
echo "All tests and coverage passed"
|
||||
echo "coverage passed"
|
||||
exit 0
|
||||
|
||||
diff_coverage_report:
|
||||
|
1
.github/workflows/approve.yml
vendored
1
.github/workflows/approve.yml
vendored
@@ -33,7 +33,6 @@ jobs:
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.10'
|
||||
cache: 'pip'
|
||||
|
||||
- name: Run approval check script
|
||||
run: |
|
||||
|
89
.github/workflows/ci.yml
vendored
89
.github/workflows/ci.yml
vendored
@@ -1,89 +0,0 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
- 'release/*'
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.event.pull_request.number }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: [self-hosted, GPU-L20-4Card]
|
||||
steps:
|
||||
- name: Print current runner name
|
||||
run: |
|
||||
echo "Current runner name: ${{ runner.name }}"
|
||||
# Because the system version is lower than 2.23, the checkout cannot be used.
|
||||
# - name: Checkout code
|
||||
# uses: actions/checkout@v4
|
||||
|
||||
- name: Code Checkout
|
||||
env:
|
||||
docker_image: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddle:fastdeploy-ciuse-cuda126
|
||||
run: |
|
||||
REPO="https://github.com/${{ github.repository }}.git"
|
||||
FULL_REPO="${{ github.repository }}"
|
||||
REPO_NAME="${FULL_REPO##*/}"
|
||||
BASE_BRANCH="${{ github.base_ref }}"
|
||||
# Clean the repository directory before starting
|
||||
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
|
||||
-e "REPO_NAME=${REPO_NAME}" \
|
||||
-e "BASE_BRANCH=${BASE_BRANCH}" \
|
||||
${docker_image} /bin/bash -c '
|
||||
if [ -d ${REPO_NAME} ]; then
|
||||
echo "Directory ${REPO_NAME} exists, removing it..."
|
||||
rm -rf ${REPO_NAME}
|
||||
fi
|
||||
'
|
||||
git config --global user.name "FastDeployCI"
|
||||
git config --global user.email "fastdeploy_ci@example.com"
|
||||
git clone ${REPO} ${REPO_NAME} -b ${BASE_BRANCH}
|
||||
cd FastDeploy
|
||||
if [ "${{ github.event_name }}" = "pull_request" ]; then
|
||||
git fetch origin pull/${{ github.event.pull_request.number }}/head:pr/${{ github.event.pull_request.number }}
|
||||
git merge pr/${{ github.event.pull_request.number }}
|
||||
git log -n 3 --oneline
|
||||
else
|
||||
git checkout ${{ github.sha }}
|
||||
git log -n 3 --oneline
|
||||
fi
|
||||
|
||||
- name: Run CI unittest
|
||||
env:
|
||||
docker_image: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddle:fastdeploy-ciuse-cuda126
|
||||
run: |
|
||||
runner_name="${{ runner.name }}"
|
||||
last_char="${runner_name: -1}"
|
||||
|
||||
if [ "${last_char}" = "1" ]; then
|
||||
gpu_id=2
|
||||
DEVICES="2,3"
|
||||
else
|
||||
gpu_id=0
|
||||
DEVICES="0,1"
|
||||
fi
|
||||
FD_API_PORT=$((9180 + gpu_id * 100))
|
||||
FD_ENGINE_QUEUE_PORT=$((9150 + gpu_id * 100))
|
||||
FD_METRICS_PORT=$((9170 + gpu_id * 100))
|
||||
|
||||
PARENT_DIR=$(dirname "$WORKSPACE")
|
||||
echo "PARENT_DIR:$PARENT_DIR"
|
||||
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
|
||||
-v "/ssd4/GithubActions/gitconfig:/etc/gitconfig:ro" \
|
||||
-v "/ssd4/GithubActions/ModelData:/ModelData:ro" \
|
||||
-v "/ssd4/GithubActions/CacheDir:/root/.cache" \
|
||||
-v "/ssd4/GithubActions/ConfigDir:/root/.config" \
|
||||
-e "MODEL_PATH=/ModelData" \
|
||||
-e "FD_API_PORT=${FD_API_PORT}" \
|
||||
-e "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" \
|
||||
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
|
||||
--gpus '"device='"${DEVICES}"'"' ${docker_image} /bin/bash -c "
|
||||
git config --global --add safe.directory /workspace/FastDeploy
|
||||
cd FastDeploy
|
||||
bash scripts/run_ci.sh
|
||||
"
|
19
.github/workflows/pr_build_and_test.yml
vendored
19
.github/workflows/pr_build_and_test.yml
vendored
@@ -21,7 +21,7 @@ jobs:
|
||||
with:
|
||||
DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:cuda126-py310
|
||||
FASTDEPLOY_ARCHIVE_URL: ${{ needs.clone.outputs.repo_archive_url }}
|
||||
COMPILE_ARCH: "90"
|
||||
COMPILE_ARCH: "89,90"
|
||||
WITH_NIGHTLY_BUILD: "OFF"
|
||||
FD_VERSION: "0.0.0"
|
||||
|
||||
@@ -39,16 +39,27 @@ jobs:
|
||||
needs: [clone,build]
|
||||
uses: ./.github/workflows/_unit_test_coverage.yml
|
||||
with:
|
||||
DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:cuda126-py310
|
||||
DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-dailyupdate
|
||||
FASTDEPLOY_ARCHIVE_URL: ${{ needs.clone.outputs.repo_archive_url }}
|
||||
FASTDEPLOY_WHEEL_URL: ${{ needs.build.outputs.wheel_path }}
|
||||
MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelData"
|
||||
|
||||
logprob_test:
|
||||
name: Run FastDeploy LogProb Tests
|
||||
needs: [build]
|
||||
uses: ./.github/workflows/_logprob_test_linux.yml
|
||||
with:
|
||||
DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:cuda126-py310
|
||||
DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-dailyupdate
|
||||
PADDLETEST_ARCHIVE_URL: "https://xly-devops.bj.bcebos.com/PaddleTest/PaddleTest.tar.gz"
|
||||
FASTDEPLOY_WHEEL_URL: ${{ needs.build.outputs.wheel_path }}
|
||||
MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelCache"
|
||||
MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelData"
|
||||
|
||||
pre_ce_test:
|
||||
name: Extracted partial CE model tasks to run in CI.
|
||||
needs: [clone,build]
|
||||
uses: ./.github/workflows/_pre_ce_test.yml
|
||||
with:
|
||||
DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-dailyupdate
|
||||
FASTDEPLOY_ARCHIVE_URL: ${{ needs.clone.outputs.repo_archive_url }}
|
||||
FASTDEPLOY_WHEEL_URL: ${{ needs.build.outputs.wheel_path }}
|
||||
MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelData"
|
||||
|
@@ -16,11 +16,17 @@ RUN apt-get update && apt-get install -y libibverbs-dev librdmacm-dev cmake pybi
|
||||
|
||||
# uninstall existing package
|
||||
RUN python -m pip uninstall paddlepaddle-gpu paddlepaddle-xpu -y
|
||||
# install paddlepaddle
|
||||
# install paddlepaddle-xpu
|
||||
RUN python -m pip install --no-cache-dir --progress-bar off paddlepaddle-xpu==${PADDLE_VERSION} -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
|
||||
RUN python -m pip install --no-cache-dir fastdeploy-xpu==${FD_VERSION} -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
|
||||
RUN mkdir -p /workspace/deps && cd /workspace/deps && \
|
||||
wget https://klx-sdk-release-public.su.bcebos.com/xre/kl3-release/5.0.21.21/xre-Linux-x86_64-5.0.21.21.tar.gz && \
|
||||
tar -zxf xre-Linux-x86_64-5.0.21.21.tar.gz && mv xre-Linux-x86_64-5.0.21.21 xre
|
||||
|
||||
ENV PATH=/workspace/deps/xre/bin:$PATH
|
||||
|
||||
ENV http_proxy=""
|
||||
ENV https_proxy=""
|
||||
ENV no_proxy=""
|
||||
|
@@ -98,7 +98,7 @@ curl -X POST "http://0.0.0.0:9222/v1/chat/completions" \
|
||||
{"role": "user", "content": "How old are you"}
|
||||
],
|
||||
"top_p": 0.8,
|
||||
"top_k": 50
|
||||
"top_k": 20
|
||||
}'
|
||||
```
|
||||
|
||||
@@ -117,7 +117,7 @@ response = client.chat.completions.create(
|
||||
],
|
||||
stream=True,
|
||||
top_p=0.8,
|
||||
top_k=50
|
||||
extra_body={"top_k": 20, "min_p":0.1}
|
||||
)
|
||||
for chunk in response:
|
||||
if chunk.choices[0].delta:
|
||||
@@ -159,8 +159,7 @@ response = client.chat.completions.create(
|
||||
],
|
||||
stream=True,
|
||||
top_p=0.8,
|
||||
top_k=20,
|
||||
min_p=0.1
|
||||
extra_body={"top_k": 20, "min_p":0.1}
|
||||
)
|
||||
for chunk in response:
|
||||
if chunk.choices[0].delta:
|
||||
|
@@ -5,7 +5,7 @@
|
||||
- OS: Linux
|
||||
- Python: 3.10
|
||||
- XPU Model: P800
|
||||
- XPU Driver Version: ≥ 5.0.21.10
|
||||
- XPU Driver Version: ≥ 5.0.21.26
|
||||
- XPU Firmware Version: ≥ 1.31
|
||||
|
||||
Verified platform:
|
||||
@@ -15,7 +15,7 @@ Verified platform:
|
||||
- OS: CentOS release 7.6 (Final)
|
||||
- Python: 3.10
|
||||
- XPU Model: P800 (OAM Edition)
|
||||
- XPU Driver Version: 5.0.21.10
|
||||
- XPU Driver Version: 5.0.21.26
|
||||
- XPU Firmware Version: 1.31
|
||||
|
||||
**Note:** Currently, only INTEL or Hygon CPU-based P800 (OAM Edition) servers have been verified. Other CPU types and P800 (PCIe Edition) servers have not been tested yet.
|
||||
@@ -25,9 +25,9 @@ Verified platform:
|
||||
```bash
|
||||
mkdir Work
|
||||
cd Work
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.1.0
|
||||
docker run --name fastdeploy-xpu --net=host -itd --privileged -v $PWD:/Work -w /Work \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3 \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.1.0 \
|
||||
/bin/bash
|
||||
docker exec -it fastdeploy-xpu /bin/bash
|
||||
```
|
||||
@@ -37,7 +37,7 @@ docker exec -it fastdeploy-xpu /bin/bash
|
||||
### Install PaddlePaddle
|
||||
|
||||
```bash
|
||||
python -m pip install paddlepaddle-xpu==3.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
python -m pip install paddlepaddle-xpu==3.1.1 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
```
|
||||
|
||||
Alternatively, you can install the latest version of PaddlePaddle (Not recommended)
|
||||
@@ -49,7 +49,7 @@ python -m pip install --pre paddlepaddle-xpu -i https://www.paddlepaddle.org.cn/
|
||||
### Install FastDeploy (**Do NOT install via PyPI source**)
|
||||
|
||||
```bash
|
||||
python -m pip install fastdeploy-xpu==2.0.3 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
python -m pip install fastdeploy-xpu==2.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
```
|
||||
|
||||
Alternatively, you can install the latest version of FastDeploy (Not recommended)
|
||||
@@ -63,7 +63,7 @@ python -m pip install --pre fastdeploy-xpu -i https://www.paddlepaddle.org.cn/pa
|
||||
### Install PaddlePaddle
|
||||
|
||||
```bash
|
||||
python -m pip install paddlepaddle-xpu==3.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
python -m pip install paddlepaddle-xpu==3.1.1 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
```
|
||||
|
||||
Alternatively, you can install the latest version of PaddlePaddle (Not recommended)
|
||||
|
@@ -183,6 +183,7 @@ For ```LLM``` configuration, refer to [Parameter Documentation](parameters.md).
|
||||
* min_p(float): Minimum probability relative to the maximum probability for a token to be considered (>0 filters low-probability tokens to improve quality)
|
||||
* max_tokens(int): Maximum generated tokens (input + output)
|
||||
* min_tokens(int): Minimum forced generation length
|
||||
* bad_words(list[str]): Prohibited words
|
||||
|
||||
### 2.5 fastdeploy.engine.request.RequestOutput
|
||||
|
||||
|
@@ -5,8 +5,14 @@
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-21B-A3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|
||||
|
@@ -98,7 +98,7 @@ curl -X POST "http://0.0.0.0:9222/v1/chat/completions" \
|
||||
{"role": "user", "content": "How old are you"}
|
||||
],
|
||||
"top_p": 0.8,
|
||||
"top_k": 50
|
||||
"top_k": 20
|
||||
}'
|
||||
```
|
||||
|
||||
@@ -118,7 +118,7 @@ response = client.chat.completions.create(
|
||||
],
|
||||
stream=True,
|
||||
top_p=0.8,
|
||||
extra_body={"top_k": 50}
|
||||
extra_body={"top_k": 20}
|
||||
)
|
||||
for chunk in response:
|
||||
if chunk.choices[0].delta:
|
||||
@@ -161,8 +161,7 @@ response = client.chat.completions.create(
|
||||
],
|
||||
stream=True,
|
||||
top_p=0.8,
|
||||
extra_body={"top_k": 20},
|
||||
min_p=0.1
|
||||
extra_body={"top_k": 20, "min_p": 0.1}
|
||||
)
|
||||
for chunk in response:
|
||||
if chunk.choices[0].delta:
|
||||
|
@@ -5,7 +5,7 @@
|
||||
- OS:Linux
|
||||
- Python:3.10
|
||||
- XPU 型号:P800
|
||||
- XPU 驱动版本:≥ 5.0.21.10
|
||||
- XPU 驱动版本:≥ 5.0.21.26
|
||||
- XPU 固件版本:≥ 1.31
|
||||
|
||||
已验证的平台:
|
||||
@@ -15,7 +15,7 @@
|
||||
- OS:CentOS release 7.6 (Final)
|
||||
- Python:3.10
|
||||
- XPU 型号:P800(OAM 版)
|
||||
- XPU 驱动版本:5.0.21.10
|
||||
- XPU 驱动版本:5.0.21.26
|
||||
- XPU 固件版本:1.31
|
||||
|
||||
**注:** 目前只验证过 INTEL 或海光 CPU OAM 版 P800 服务器,暂未验证其它 CPU 和 PCIe 版 P800 服务器。
|
||||
@@ -25,9 +25,9 @@
|
||||
```bash
|
||||
mkdir Work
|
||||
cd Work
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.1.0
|
||||
docker run --name fastdeploy-xpu --net=host -itd --privileged -v $PWD:/Work -w /Work \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3 \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.1.0 \
|
||||
/bin/bash
|
||||
docker exec -it fastdeploy-xpu /bin/bash
|
||||
```
|
||||
@@ -37,7 +37,7 @@ docker exec -it fastdeploy-xpu /bin/bash
|
||||
### 安装 PaddlePaddle
|
||||
|
||||
```bash
|
||||
python -m pip install paddlepaddle-xpu==3.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
python -m pip install paddlepaddle-xpu==3.1.1 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
```
|
||||
|
||||
或者您也可以安装最新版 PaddlePaddle(不推荐)
|
||||
@@ -49,7 +49,7 @@ python -m pip install --pre paddlepaddle-xpu -i https://www.paddlepaddle.org.cn/
|
||||
### 安装 FastDeploy(**注意不要通过 pypi 源安装**)
|
||||
|
||||
```bash
|
||||
python -m pip install fastdeploy-xpu==2.0.3 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
python -m pip install fastdeploy-xpu==2.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
```
|
||||
|
||||
或者你也可以安装最新版 FastDeploy(不推荐)
|
||||
@@ -63,7 +63,7 @@ python -m pip install --pre fastdeploy-xpu -i https://www.paddlepaddle.org.cn/pa
|
||||
### 安装 PaddlePaddle
|
||||
|
||||
```bash
|
||||
python -m pip install paddlepaddle-xpu==3.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
python -m pip install paddlepaddle-xpu==3.1.1 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/
|
||||
```
|
||||
|
||||
或者您也可以安装最新版 PaddlePaddle(不推荐)
|
||||
|
@@ -183,6 +183,7 @@ for output in outputs:
|
||||
* min_p(float): token入选的最小概率阈值(相对于最高概率token的比值,设为>0可通过过滤低概率token来提升文本生成质量)
|
||||
* max_tokens(int): 限制模型生成的最大token数量(包括输入和输出)
|
||||
* min_tokens(int): 强制模型生成的最少token数量,避免过早结束
|
||||
* bad_words(list[str]): 禁止生成的词列表, 防止模型生成不希望出现的词
|
||||
|
||||
### 2.5 fastdeploy.engine.request.RequestOutput
|
||||
|
||||
|
@@ -5,6 +5,12 @@
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-21B-A3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.1.0|
|
||||
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|
@@ -24,7 +24,11 @@ os.environ["GLOG_minloglevel"] = "2"
|
||||
os.environ["AISTUDIO_LOG"] = "critical"
|
||||
from fastdeploy.engine.sampling_params import SamplingParams
|
||||
from fastdeploy.entrypoints.llm import LLM
|
||||
from fastdeploy.utils import version
|
||||
from fastdeploy.utils import version, envs
|
||||
from paddleformers.utils.log import logger as pf_logger
|
||||
if envs.FD_DEBUG != "1":
|
||||
import logging
|
||||
pf_logger.logger.setLevel(logging.INFO)
|
||||
|
||||
__all__ = ["LLM", "SamplingParams", "version"]
|
||||
|
||||
|
@@ -142,7 +142,7 @@ class CacheMessager:
|
||||
|
||||
self.gpu_id = gpu_id
|
||||
self.cache_info = dict()
|
||||
self.dp_rank_id = local_data_parallel_id
|
||||
self.dp_rank_id = self.rank + local_data_parallel_id * self.nranks
|
||||
|
||||
layerwise_send_cache_thread = threading.Thread(target=self._prefill_layerwise_send_cache_thread)
|
||||
layerwise_send_cache_thread.daemon = True
|
||||
|
@@ -64,7 +64,10 @@ class PrefixCacheManager:
|
||||
self.speculative_config = config.speculative_config
|
||||
self.local_data_parallel_id = local_data_parallel_id
|
||||
|
||||
self.num_gpu_blocks = self.cache_config.prefill_kvcache_block_num
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.num_gpu_blocks = self.cache_config.total_block_num
|
||||
else:
|
||||
self.num_gpu_blocks = self.cache_config.prefill_kvcache_block_num
|
||||
self.num_cpu_blocks = self.cache_config.num_cpu_blocks
|
||||
self.gpu_free_block_list = list(range(self.num_gpu_blocks - 1, -1, -1))
|
||||
if self.num_cpu_blocks > 0:
|
||||
|
@@ -726,7 +726,10 @@ class CacheConfig:
|
||||
self.block_size = 64
|
||||
self.gpu_memory_utilization = 0.9
|
||||
self.num_gpu_blocks_override = None
|
||||
self.kv_cache_ratio = 0.75
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.kv_cache_ratio = 1.0
|
||||
else:
|
||||
self.kv_cache_ratio = 0.75
|
||||
self.enc_dec_block_num = 2
|
||||
self.prealloc_dec_block_slot_num_threshold = 5
|
||||
self.cache_dtype = "bfloat16"
|
||||
@@ -811,7 +814,10 @@ class CacheConfig:
|
||||
self.dec_token_num = self.enc_dec_block_num * self.block_size
|
||||
if self.num_gpu_blocks_override is not None:
|
||||
self.total_block_num = self.num_gpu_blocks_override
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.prefill_kvcache_block_num = self.total_block_num
|
||||
else:
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
else:
|
||||
length = num_total_tokens // number_of_tasks
|
||||
block_num = (length + self.block_size - 1 + self.dec_token_num) // self.block_size
|
||||
@@ -824,7 +830,10 @@ class CacheConfig:
|
||||
reset gpu block number
|
||||
"""
|
||||
self.total_block_num = num_gpu_blocks
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.prefill_kvcache_block_num = self.total_block_num
|
||||
else:
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
logger.info(
|
||||
f"Reset block num, the total_block_num:{self.total_block_num},"
|
||||
f" prefill_kvcache_block_num:{self.prefill_kvcache_block_num}"
|
||||
|
@@ -15,10 +15,13 @@
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import asdict, dataclass
|
||||
from dataclasses import fields as dataclass_fields
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import paddle
|
||||
|
||||
from fastdeploy.config import (
|
||||
CacheConfig,
|
||||
EarlyStopConfig,
|
||||
@@ -92,6 +95,14 @@ class EngineArgs:
|
||||
"""
|
||||
specifies the reasoning parser to use for extracting reasoning content from the model output
|
||||
"""
|
||||
tool_call_parser: str = None
|
||||
"""
|
||||
specifies the tool call parser to use for extracting tool call from the model output
|
||||
"""
|
||||
tool_parser_plugin: str = None
|
||||
"""
|
||||
tool parser plugin used to register user defined tool parsers
|
||||
"""
|
||||
enable_mm: bool = False
|
||||
"""
|
||||
Flags to enable multi-modal model
|
||||
@@ -420,6 +431,18 @@ class EngineArgs:
|
||||
help="Flag specifies the reasoning parser to use for extracting "
|
||||
"reasoning content from the model output",
|
||||
)
|
||||
model_group.add_argument(
|
||||
"--tool-call-parser",
|
||||
type=str,
|
||||
default=EngineArgs.tool_call_parser,
|
||||
help="Flag specifies the tool call parser to use for extracting" "tool call from the model output",
|
||||
)
|
||||
model_group.add_argument(
|
||||
"--tool-parser-plugin",
|
||||
type=str,
|
||||
default=EngineArgs.tool_parser_plugin,
|
||||
help="tool parser plugin used to register user defined tool parsers",
|
||||
)
|
||||
model_group.add_argument(
|
||||
"--speculative-config",
|
||||
type=json.loads,
|
||||
@@ -865,7 +888,13 @@ class EngineArgs:
|
||||
if self.enable_chunked_prefill:
|
||||
self.max_num_batched_tokens = 2048
|
||||
else:
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")):
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
else:
|
||||
if paddle.is_compiled_with_xpu():
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
else:
|
||||
self.max_num_batched_tokens = 8192
|
||||
|
||||
all_dict = asdict(self)
|
||||
all_dict["model_cfg"] = model_cfg
|
||||
@@ -904,6 +933,7 @@ class EngineArgs:
|
||||
mm_processor_kwargs=self.mm_processor_kwargs,
|
||||
enable_mm=self.enable_mm,
|
||||
reasoning_parser=self.reasoning_parser,
|
||||
tool_parser=self.tool_call_parser,
|
||||
splitwise_role=self.splitwise_role,
|
||||
innode_prefill_ports=self.innode_prefill_ports,
|
||||
max_num_partial_prefills=self.max_num_partial_prefills,
|
||||
|
@@ -85,6 +85,7 @@ class Config:
|
||||
max_long_partial_prefills: int = 1,
|
||||
long_prefill_token_threshold: int = 0,
|
||||
reasoning_parser: str = None,
|
||||
tool_parser: str = None,
|
||||
guided_decoding_backend: Optional[str] = None,
|
||||
disable_any_whitespace: bool = False,
|
||||
enable_logprob: bool = False,
|
||||
@@ -165,6 +166,7 @@ class Config:
|
||||
self.max_long_partial_prefills = max_long_partial_prefills
|
||||
self.long_prefill_token_threshold = long_prefill_token_threshold
|
||||
self.reasoning_parser = reasoning_parser
|
||||
self.tool_parser = tool_parser
|
||||
self.graph_optimization_config = graph_optimization_config
|
||||
self.early_stop_config = early_stop_config
|
||||
self.guided_decoding_backend = guided_decoding_backend
|
||||
@@ -236,7 +238,13 @@ class Config:
|
||||
if self.cache_config.enable_chunked_prefill:
|
||||
self.max_num_batched_tokens = 2048
|
||||
else:
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")):
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
else:
|
||||
if paddle.is_compiled_with_xpu():
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
else:
|
||||
self.max_num_batched_tokens = 8192
|
||||
|
||||
if self.long_prefill_token_threshold == 0:
|
||||
self.long_prefill_token_threshold = int(self.max_model_len * 0.04)
|
||||
@@ -284,10 +292,11 @@ class Config:
|
||||
)
|
||||
|
||||
if not self.cache_config.enable_chunked_prefill:
|
||||
assert self.max_num_batched_tokens >= self.max_model_len, (
|
||||
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
|
||||
f"should be larger than or equal to max_model_len: {self.max_model_len}"
|
||||
)
|
||||
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")):
|
||||
assert self.max_num_batched_tokens >= self.max_model_len, (
|
||||
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
|
||||
f"should be larger than or equal to max_model_len: {self.max_model_len}"
|
||||
)
|
||||
else:
|
||||
assert self.max_num_batched_tokens >= self.cache_config.block_size, (
|
||||
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
|
||||
|
@@ -106,6 +106,7 @@ class LLMEngine:
|
||||
cfg.limit_mm_per_prompt,
|
||||
cfg.mm_processor_kwargs,
|
||||
cfg.enable_mm,
|
||||
cfg.tool_parser,
|
||||
)
|
||||
|
||||
self.start_queue_service()
|
||||
@@ -530,6 +531,26 @@ class LLMEngine:
|
||||
llm_logger.error(error_msg)
|
||||
raise EngineError(error_msg, error_code=400)
|
||||
|
||||
if request.get("stop_seqs_len") is not None:
|
||||
stop_seqs_len = request.get("stop_seqs_len")
|
||||
max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM)
|
||||
if len(stop_seqs_len) > max_stop_seqs_num:
|
||||
error_msg = (
|
||||
f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})."
|
||||
"Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`"
|
||||
)
|
||||
llm_logger.error(error_msg)
|
||||
raise EngineError(error_msg, error_code=400)
|
||||
stop_seqs_max_len = int(envs.FD_STOP_SEQS_MAX_LEN)
|
||||
for single_stop_seq_len in stop_seqs_len:
|
||||
if single_stop_seq_len > stop_seqs_max_len:
|
||||
error_msg = (
|
||||
f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})."
|
||||
"Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`"
|
||||
)
|
||||
llm_logger.error(error_msg)
|
||||
raise EngineError(error_msg, error_code=400)
|
||||
|
||||
if self.guided_decoding_checker is not None:
|
||||
request, err_msg = self.guided_decoding_checker.schema_format(request)
|
||||
if err_msg is not None:
|
||||
@@ -749,10 +770,6 @@ class LLMEngine:
|
||||
"""
|
||||
Insert tasks to engine.
|
||||
"""
|
||||
for task in tasks:
|
||||
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
|
||||
if task.sampling_params.bad_words is not None:
|
||||
task.sampling_params.update_from_tokenizer(self.data_processor.tokenizer)
|
||||
# TODO 返回至 scheduler
|
||||
if allocated:
|
||||
current_tasks = []
|
||||
@@ -779,6 +796,11 @@ class LLMEngine:
|
||||
self.engine_worker_queue.put_tasks((current_tasks, self.resource_manager.real_bsz))
|
||||
return True
|
||||
|
||||
for task in tasks:
|
||||
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
|
||||
if task.sampling_params.bad_words is not None:
|
||||
task.sampling_params.update_from_tokenizer(self.data_processor.tokenizer)
|
||||
|
||||
self.resource_manager.check_and_free_block_tables()
|
||||
|
||||
if not isinstance(tasks, list):
|
||||
|
@@ -24,6 +24,7 @@ from typing import Any, Dict, Optional, Union
|
||||
import numpy as np
|
||||
|
||||
from fastdeploy.engine.sampling_params import SamplingParams
|
||||
from fastdeploy.entrypoints.openai.protocol import ToolCall
|
||||
from fastdeploy.utils import data_processor_logger
|
||||
from fastdeploy.worker.output import LogprobsLists, SampleLogprobs
|
||||
|
||||
@@ -249,6 +250,7 @@ class CompletionOutput:
|
||||
draft_token_ids: list[int] = None
|
||||
text: Optional[str] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
tool_calls: Optional[ToolCall] = None
|
||||
|
||||
def to_dict(self):
|
||||
"""
|
||||
|
@@ -218,20 +218,22 @@ class SamplingParams:
|
||||
prompt_token_ids = tokenizer.encode(text=prompt, add_special_tokens=False)["input_ids"]
|
||||
|
||||
if len(prompt_token_ids) != 1:
|
||||
logger.warning(
|
||||
f"Skip bad_words: {prompt}."
|
||||
f"Bad words should be a single token."
|
||||
f"Got tokens: {prompt_token_ids}."
|
||||
)
|
||||
if not add_prefix_space:
|
||||
logger.warning(
|
||||
f"Skip bad_words: <{prompt}>."
|
||||
f"Bad words should be a single token."
|
||||
f"Got tokens: {prompt_token_ids}."
|
||||
)
|
||||
continue
|
||||
|
||||
if prompt_token_ids[0] > tokenizer.vocab_size:
|
||||
logger.warning(
|
||||
f"Skip bad_words: {prompt}."
|
||||
f"All token id values should be satisfying:"
|
||||
f" 0 <= token_id < {tokenizer.vocab_size}."
|
||||
f"Got token: {prompt_token_ids}."
|
||||
)
|
||||
if not add_prefix_space:
|
||||
logger.warning(
|
||||
f"Skip bad_words: <{prompt}>."
|
||||
f"All token id values should be satisfying:"
|
||||
f" 0 <= token_id < {tokenizer.vocab_size}."
|
||||
f"Got token: {prompt_token_ids}."
|
||||
)
|
||||
continue
|
||||
|
||||
if prompt_token_ids not in self._bad_words_token_ids:
|
||||
|
@@ -75,6 +75,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
self.running: list[Request] = []
|
||||
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
|
||||
self.lock = threading.Lock()
|
||||
self.to_be_rescheduled_request_id_set = set()
|
||||
|
||||
def allocated_slots(self, request: Request):
|
||||
return len(request.block_tables) * self.config.cache_config.block_size
|
||||
@@ -97,6 +98,13 @@ class ResourceManagerV1(ResourceManager):
|
||||
def _prepare_preempt_task(self, request):
|
||||
return ScheduledPreemptTask(idx=request.idx, request_id=request.request_id)
|
||||
|
||||
def reschedule_preempt_task(self, request_id):
|
||||
with self.lock:
|
||||
if request_id in self.to_be_rescheduled_request_id_set and request_id in self.requests:
|
||||
request = self.requests[request_id]
|
||||
self.waiting.appendleft(request)
|
||||
self.to_be_rescheduled_request_id_set.remove(request_id)
|
||||
|
||||
def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_reqs):
|
||||
can_schedule = True
|
||||
while True:
|
||||
@@ -106,7 +114,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
preempted_req.num_computed_tokens = 0
|
||||
preempted_req.prefill_block_num = 0
|
||||
self._free_blocks(preempted_req)
|
||||
self.waiting.appendleft(preempted_req)
|
||||
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
|
||||
preempted_reqs.append(preempted_req)
|
||||
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
|
||||
if preempted_req == request:
|
||||
@@ -134,26 +142,31 @@ class ResourceManagerV1(ResourceManager):
|
||||
|
||||
input_ids_lst = request.prompt_token_ids + request.output_token_ids
|
||||
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
|
||||
grid_thw = []
|
||||
for one in inputs["grid_thw"]:
|
||||
if one[0] == 1:
|
||||
grid_thw.append(one)
|
||||
else:
|
||||
grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2))
|
||||
|
||||
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
|
||||
image_patch_id = inputs["image_patch_id"]
|
||||
grid_thw = paddle.to_tensor(grid_thw, dtype="int64")
|
||||
|
||||
if request.multimodal_img_boundaries is None:
|
||||
grid_thw = []
|
||||
for one in inputs["grid_thw"]:
|
||||
if one[0] == 1:
|
||||
grid_thw.append(one)
|
||||
else:
|
||||
grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2))
|
||||
|
||||
grid_thw = paddle.to_tensor(grid_thw, dtype="int64")
|
||||
from fastdeploy.model_executor.ops.gpu import get_img_boundaries
|
||||
|
||||
request.multimodal_img_boundaries = get_img_boundaries(
|
||||
task_input_ids=input_ids, grid_thw=grid_thw, image_patch_id=image_patch_id
|
||||
).numpy()
|
||||
|
||||
grid_thw = grid_thw.numpy().reshape([-1, 3])
|
||||
inputs["grid_thw"] = grid_thw
|
||||
|
||||
grid_thw = inputs["grid_thw"]
|
||||
img_boundaries_idx = request.multimodal_img_boundaries[0]
|
||||
img_num_per_boundary = request.multimodal_img_boundaries[1]
|
||||
ori_prompt_len = img_boundaries_idx[-1].item()
|
||||
grid_thw = grid_thw.numpy().reshape([-1, 3])
|
||||
pre_end_idx = request.num_computed_tokens
|
||||
new_end_idx = pre_end_idx + num_new_tokens
|
||||
if new_end_idx < ori_prompt_len and input_ids[new_end_idx - 1] == image_patch_id:
|
||||
@@ -187,7 +200,6 @@ class ResourceManagerV1(ResourceManager):
|
||||
)
|
||||
request.num_image_end = img_num_per_boundary[new_boundary_idx]
|
||||
|
||||
request.num_image_end = img_num_per_boundary[new_boundary_idx]
|
||||
request.image_type_ids_start = np.sum(grid_thw[: request.num_image_start, 0])
|
||||
request.image_type_ids_end = np.sum(grid_thw[: request.num_image_end, 0])
|
||||
request.image_start = np.sum(np.prod(grid_thw[: request.num_image_start], axis=1))
|
||||
@@ -277,7 +289,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
while self.waiting and token_budget > 0:
|
||||
if len(self.running) == self.max_num_seqs:
|
||||
break
|
||||
if self.config.enable_mm and self.exist_prefill(scheduled_reqs):
|
||||
if (self.config.enable_mm or paddle.is_compiled_with_xpu()) and self.exist_prefill(scheduled_reqs):
|
||||
break
|
||||
request = self.waiting[0]
|
||||
if request.status == RequestStatus.WAITING:
|
||||
@@ -381,8 +393,9 @@ class ResourceManagerV1(ResourceManager):
|
||||
return False
|
||||
|
||||
def add_request(self, request: Request) -> None:
|
||||
self.waiting.append(request)
|
||||
self.requests[request.request_id] = request
|
||||
with self.lock:
|
||||
self.waiting.append(request)
|
||||
self.requests[request.request_id] = request
|
||||
|
||||
def _free_blocks(self, request: Request):
|
||||
if self.config.cache_config.enable_prefix_caching:
|
||||
@@ -409,9 +422,20 @@ class ResourceManagerV1(ResourceManager):
|
||||
if request is None:
|
||||
# Invalid request ID.
|
||||
continue
|
||||
request.status = RequestStatus.FINISHED
|
||||
self.running.remove(request)
|
||||
self._free_blocks(request)
|
||||
if request in self.running: # normally run and finished
|
||||
self.running.remove(request)
|
||||
request.status = RequestStatus.FINISHED
|
||||
self._free_blocks(request)
|
||||
if (
|
||||
request.request_id in self.to_be_rescheduled_request_id_set
|
||||
): # finished after preempted, blocks have been recycled.
|
||||
self.to_be_rescheduled_request_id_set.remove(
|
||||
request.request_id
|
||||
) # just remove from to_be_rescheduled_request_id_set
|
||||
if (
|
||||
request in self.waiting
|
||||
): # after finished, this request still scheduled from preempted to waiting, unexpected error, should not be here
|
||||
raise RuntimeError(f"request {request.request_id} scheduled into waiting list, after finished")
|
||||
self.tasks_list[request.idx] = None
|
||||
self.stop_flags[request.idx] = True
|
||||
del self.requests[req_id]
|
||||
|
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from copy import deepcopy
|
||||
from typing import List, Literal, Union
|
||||
from urllib.parse import urlparse
|
||||
@@ -156,3 +157,7 @@ def parse_chat_messages(messages):
|
||||
|
||||
conversation.append({"role": role, "content": parsed_content})
|
||||
return conversation
|
||||
|
||||
|
||||
def random_tool_call_id() -> str:
|
||||
return f"chatcmpl-tool-{str(uuid.uuid4().hex)}"
|
||||
|
@@ -19,11 +19,12 @@ import uuid
|
||||
|
||||
import numpy as np
|
||||
|
||||
from fastdeploy import envs
|
||||
from fastdeploy.input.preprocess import InputPreprocessor
|
||||
from fastdeploy.inter_communicator import IPCSignal, ZmqClient
|
||||
from fastdeploy.metrics.work_metrics import work_process_metrics
|
||||
from fastdeploy.platforms import current_platform
|
||||
from fastdeploy.utils import EngineError, api_server_logger
|
||||
from fastdeploy.utils import EngineError, StatefulSemaphore, api_server_logger
|
||||
|
||||
|
||||
class EngineClient:
|
||||
@@ -43,6 +44,8 @@ class EngineClient:
|
||||
reasoning_parser=None,
|
||||
data_parallel_size=1,
|
||||
enable_logprob=False,
|
||||
workers=1,
|
||||
tool_parser=None,
|
||||
):
|
||||
input_processor = InputPreprocessor(
|
||||
tokenizer,
|
||||
@@ -50,6 +53,7 @@ class EngineClient:
|
||||
limit_mm_per_prompt,
|
||||
mm_processor_kwargs,
|
||||
enable_mm,
|
||||
tool_parser,
|
||||
)
|
||||
self.enable_logprob = enable_logprob
|
||||
self.enable_mm = enable_mm
|
||||
@@ -75,6 +79,7 @@ class EngineClient:
|
||||
suffix=pid,
|
||||
create=False,
|
||||
)
|
||||
self.semaphore = StatefulSemaphore((envs.FD_SUPPORT_MAX_CONNECTIONS + workers - 1) // workers)
|
||||
|
||||
def create_zmq_client(self, model, mode):
|
||||
"""
|
||||
@@ -144,6 +149,26 @@ class EngineClient:
|
||||
api_server_logger.error(error_msg)
|
||||
raise EngineError(error_msg, error_code=400)
|
||||
|
||||
if "stop_seqs_len" in task:
|
||||
stop_seqs_len = task["stop_seqs_len"]
|
||||
max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM)
|
||||
if len(stop_seqs_len) > max_stop_seqs_num:
|
||||
error_msg = (
|
||||
f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})."
|
||||
"Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`"
|
||||
)
|
||||
api_server_logger.error(error_msg)
|
||||
raise EngineError(error_msg, error_code=400)
|
||||
stop_seqs_max_len = int(envs.FD_STOP_SEQS_MAX_LEN)
|
||||
for single_stop_seq_len in stop_seqs_len:
|
||||
if single_stop_seq_len > stop_seqs_max_len:
|
||||
error_msg = (
|
||||
f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})."
|
||||
"Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`"
|
||||
)
|
||||
api_server_logger.error(error_msg)
|
||||
raise EngineError(error_msg, error_code=400)
|
||||
|
||||
task["preprocess_end_time"] = time.time()
|
||||
preprocess_cost_time = task["preprocess_end_time"] - task["preprocess_start_time"]
|
||||
api_server_logger.info(
|
||||
|
@@ -28,6 +28,7 @@ from tqdm import tqdm
|
||||
from fastdeploy.engine.args_utils import EngineArgs
|
||||
from fastdeploy.engine.engine import LLMEngine
|
||||
from fastdeploy.engine.sampling_params import SamplingParams
|
||||
from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager
|
||||
|
||||
# from fastdeploy.entrypoints.chat_utils import ChatCompletionMessageParam
|
||||
from fastdeploy.utils import llm_logger, retrive_model_from_server
|
||||
@@ -73,6 +74,9 @@ class LLM:
|
||||
**kwargs,
|
||||
):
|
||||
model = retrive_model_from_server(model, revision)
|
||||
tool_parser_plugin = kwargs.get("tool_parser_plugin")
|
||||
if tool_parser_plugin:
|
||||
ToolParserManager.import_tool_parser(tool_parser_plugin)
|
||||
engine_args = EngineArgs(
|
||||
model=model,
|
||||
tokenizer=tokenizer,
|
||||
@@ -285,6 +289,10 @@ class LLM:
|
||||
self.llm_engine.add_requests(tasks, current_sampling_params, enable_thinking=enable_thinking)
|
||||
return req_ids
|
||||
|
||||
def _decode_token(self, token_id: int) -> str:
|
||||
"""Decodes a single token ID into its string representation."""
|
||||
return self.llm_engine.data_processor.process_logprob_response([token_id], clean_up_tokenization_spaces=False)
|
||||
|
||||
def _build_sample_logprobs(self, logprobs_lists: LogprobsLists, topk_logprobs: int) -> list[dict[int, Logprob]]:
|
||||
"""
|
||||
Constructs a list of dictionaries mapping token IDs to Logprob objects,
|
||||
@@ -318,8 +326,9 @@ class LLM:
|
||||
sliced_logprobs_lists = logprobs_lists.slice_columns(1, 1 + effective_topk_logprobs)
|
||||
result = []
|
||||
for token_ids, logprobs in zip(sliced_logprobs_lists.logprob_token_ids, sliced_logprobs_lists.logprobs):
|
||||
|
||||
logprob_dict = {
|
||||
token_id: Logprob(logprob=logprob, rank=i + 1, decoded_token=None)
|
||||
token_id: Logprob(logprob=logprob, rank=i + 1, decoded_token=self._decode_token(token_id))
|
||||
for i, (token_id, logprob) in enumerate(zip(token_ids, logprobs))
|
||||
}
|
||||
result.append(logprob_dict)
|
||||
|
@@ -14,15 +14,17 @@
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import AsyncGenerator
|
||||
from contextlib import asynccontextmanager
|
||||
from multiprocessing import current_process
|
||||
|
||||
import uvicorn
|
||||
import zmq
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.responses import JSONResponse, Response, StreamingResponse
|
||||
from prometheus_client import CONTENT_TYPE_LATEST
|
||||
|
||||
@@ -39,15 +41,17 @@ from fastdeploy.entrypoints.openai.protocol import (
|
||||
)
|
||||
from fastdeploy.entrypoints.openai.serving_chat import OpenAIServingChat
|
||||
from fastdeploy.entrypoints.openai.serving_completion import OpenAIServingCompletion
|
||||
from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager
|
||||
from fastdeploy.metrics.metrics import (
|
||||
EXCLUDE_LABELS,
|
||||
cleanup_prometheus_files,
|
||||
get_filtered_metrics,
|
||||
main_process_metrics,
|
||||
)
|
||||
from fastdeploy.metrics.trace_util import inject_to_metadata, instrument
|
||||
from fastdeploy.metrics.trace_util import fd_start_span, inject_to_metadata, instrument
|
||||
from fastdeploy.utils import (
|
||||
FlexibleArgumentParser,
|
||||
StatefulSemaphore,
|
||||
api_server_logger,
|
||||
console_logger,
|
||||
is_port_available,
|
||||
@@ -60,10 +64,18 @@ parser.add_argument("--host", default="0.0.0.0", type=str, help="host to the htt
|
||||
parser.add_argument("--workers", default=1, type=int, help="number of workers")
|
||||
parser.add_argument("--metrics-port", default=8001, type=int, help="port for metrics server")
|
||||
parser.add_argument("--controller-port", default=-1, type=int, help="port for controller server")
|
||||
parser.add_argument(
|
||||
"--max-waiting-time",
|
||||
default=-1,
|
||||
type=int,
|
||||
help="max waiting time for connection, if set value -1 means no waiting time limit",
|
||||
)
|
||||
parser.add_argument("--max-concurrency", default=512, type=int, help="max concurrency")
|
||||
parser = EngineArgs.add_cli_args(parser)
|
||||
args = parser.parse_args()
|
||||
args.model = retrive_model_from_server(args.model, args.revision)
|
||||
|
||||
if args.tool_parser_plugin:
|
||||
ToolParserManager.import_tool_parser(args.tool_parser_plugin)
|
||||
llm_engine = None
|
||||
|
||||
|
||||
@@ -115,10 +127,12 @@ async def lifespan(app: FastAPI):
|
||||
args.reasoning_parser,
|
||||
args.data_parallel_size,
|
||||
args.enable_logprob,
|
||||
args.workers,
|
||||
args.tool_call_parser,
|
||||
)
|
||||
app.state.dynamic_load_weight = args.dynamic_load_weight
|
||||
chat_handler = OpenAIServingChat(engine_client, pid, args.ips)
|
||||
completion_handler = OpenAIServingCompletion(engine_client, pid, args.ips)
|
||||
chat_handler = OpenAIServingChat(engine_client, pid, args.ips, args.max_waiting_time)
|
||||
completion_handler = OpenAIServingCompletion(engine_client, pid, args.ips, args.max_waiting_time)
|
||||
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
|
||||
engine_client.pid = pid
|
||||
app.state.engine_client = engine_client
|
||||
@@ -140,6 +154,41 @@ app = FastAPI(lifespan=lifespan)
|
||||
instrument(app)
|
||||
|
||||
|
||||
MAX_CONCURRENT_CONNECTIONS = (args.max_concurrency + args.workers - 1) // args.workers
|
||||
connection_semaphore = StatefulSemaphore(MAX_CONCURRENT_CONNECTIONS)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def connection_manager():
|
||||
"""
|
||||
async context manager for connection manager
|
||||
"""
|
||||
try:
|
||||
await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001)
|
||||
yield
|
||||
except asyncio.TimeoutError:
|
||||
api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}")
|
||||
raise HTTPException(
|
||||
status_code=429, detail=f"Too many requests, current max concurrency is {args.max_concurrency}"
|
||||
)
|
||||
|
||||
|
||||
def wrap_streaming_generator(original_generator: AsyncGenerator):
|
||||
"""
|
||||
Wrap an async generator to release the connection semaphore when the generator is finished.
|
||||
"""
|
||||
|
||||
async def wrapped_generator():
|
||||
try:
|
||||
async for chunk in original_generator:
|
||||
yield chunk
|
||||
finally:
|
||||
api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}")
|
||||
connection_semaphore.release()
|
||||
|
||||
return wrapped_generator
|
||||
|
||||
|
||||
# TODO 传递真实引擎值 通过pid 获取状态
|
||||
@app.get("/health")
|
||||
def health(request: Request) -> Response:
|
||||
@@ -198,20 +247,30 @@ async def create_chat_completion(request: ChatCompletionRequest):
|
||||
"""
|
||||
Create a chat completion for the provided prompt and parameters.
|
||||
"""
|
||||
api_server_logger.info(f"Chat Received request: {request.model_dump_json()}")
|
||||
if app.state.dynamic_load_weight:
|
||||
status, msg = app.state.engine_client.is_workers_alive()
|
||||
if not status:
|
||||
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
|
||||
inject_to_metadata(request)
|
||||
generator = await app.state.chat_handler.create_chat_completion(request)
|
||||
try:
|
||||
async with connection_manager():
|
||||
inject_to_metadata(request)
|
||||
generator = await app.state.chat_handler.create_chat_completion(request)
|
||||
if isinstance(generator, ErrorResponse):
|
||||
connection_semaphore.release()
|
||||
api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}")
|
||||
return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code)
|
||||
elif isinstance(generator, ChatCompletionResponse):
|
||||
connection_semaphore.release()
|
||||
api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}")
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
else:
|
||||
wrapped_generator = wrap_streaming_generator(generator)
|
||||
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
|
||||
|
||||
if isinstance(generator, ErrorResponse):
|
||||
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
|
||||
|
||||
elif isinstance(generator, ChatCompletionResponse):
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
|
||||
return StreamingResponse(content=generator, media_type="text/event-stream")
|
||||
except HTTPException as e:
|
||||
api_server_logger.error(f"Error in chat completion: {str(e)}")
|
||||
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
|
||||
|
||||
|
||||
@app.post("/v1/completions")
|
||||
@@ -219,18 +278,26 @@ async def create_completion(request: CompletionRequest):
|
||||
"""
|
||||
Create a completion for the provided prompt and parameters.
|
||||
"""
|
||||
api_server_logger.info(f"Completion Received request: {request.model_dump_json()}")
|
||||
if app.state.dynamic_load_weight:
|
||||
status, msg = app.state.engine_client.is_workers_alive()
|
||||
if not status:
|
||||
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
|
||||
|
||||
generator = await app.state.completion_handler.create_completion(request)
|
||||
if isinstance(generator, ErrorResponse):
|
||||
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
|
||||
elif isinstance(generator, CompletionResponse):
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
|
||||
return StreamingResponse(content=generator, media_type="text/event-stream")
|
||||
try:
|
||||
async with connection_manager():
|
||||
generator = await app.state.completion_handler.create_completion(request)
|
||||
if isinstance(generator, ErrorResponse):
|
||||
connection_semaphore.release()
|
||||
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
|
||||
elif isinstance(generator, CompletionResponse):
|
||||
connection_semaphore.release()
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
else:
|
||||
wrapped_generator = wrap_streaming_generator(generator)
|
||||
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
|
||||
except HTTPException as e:
|
||||
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
|
||||
|
||||
|
||||
@app.get("/update_model_weight")
|
||||
@@ -270,6 +337,7 @@ def launch_api_server() -> None:
|
||||
|
||||
api_server_logger.info(f"launch Fastdeploy api server... port: {args.port}")
|
||||
api_server_logger.info(f"args: {args.__dict__}")
|
||||
fd_start_span("FD_START")
|
||||
|
||||
try:
|
||||
uvicorn.run(
|
||||
|
@@ -72,7 +72,6 @@ class ToolCall(BaseModel):
|
||||
id: str = None
|
||||
type: Literal["function"] = "function"
|
||||
function: FunctionCall
|
||||
index: int
|
||||
|
||||
|
||||
class DeltaFunctionCall(BaseModel):
|
||||
@@ -96,6 +95,18 @@ class DeltaToolCall(BaseModel):
|
||||
function: Optional[DeltaFunctionCall] = None
|
||||
|
||||
|
||||
class ExtractedToolCallInformation(BaseModel):
|
||||
# indicate if tools were called
|
||||
tools_called: bool
|
||||
|
||||
# extracted tool calls
|
||||
tool_calls: Optional[list[ToolCall]] = None
|
||||
|
||||
# content - per OpenAI spec, content AND tool calls can be returned rarely
|
||||
# But some models will do this intentionally
|
||||
content: Optional[str] = None
|
||||
|
||||
|
||||
class FunctionDefinition(BaseModel):
|
||||
"""
|
||||
Function definition.
|
||||
@@ -126,6 +137,8 @@ class ChatMessage(BaseModel):
|
||||
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
|
||||
prompt_token_ids: Optional[List[int]] = None
|
||||
completion_token_ids: Optional[List[int]] = None
|
||||
text_after_process: Optional[str] = None
|
||||
raw_prediction: Optional[str] = None
|
||||
|
||||
|
||||
class ChatCompletionResponseChoice(BaseModel):
|
||||
@@ -183,6 +196,8 @@ class DeltaMessage(BaseModel):
|
||||
completion_token_ids: Optional[List[int]] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
|
||||
text_after_process: Optional[str] = None
|
||||
raw_prediction: Optional[str] = None
|
||||
|
||||
|
||||
class ChatCompletionResponseStreamChoice(BaseModel):
|
||||
@@ -219,6 +234,8 @@ class CompletionResponseChoice(BaseModel):
|
||||
text: str
|
||||
prompt_token_ids: Optional[List[int]] = None
|
||||
completion_token_ids: Optional[List[int]] = None
|
||||
text_after_process: Optional[str] = None
|
||||
raw_prediction: Optional[str] = None
|
||||
arrival_time: Optional[float] = None
|
||||
logprobs: Optional[CompletionLogprobs] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
@@ -261,6 +278,8 @@ class CompletionResponseStreamChoice(BaseModel):
|
||||
logprobs: Optional[CompletionLogprobs] = None
|
||||
prompt_token_ids: Optional[List[int]] = None
|
||||
completion_token_ids: Optional[List[int]] = None
|
||||
text_after_process: Optional[str] = None
|
||||
raw_prediction: Optional[str] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None
|
||||
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
|
||||
|
@@ -49,10 +49,11 @@ class OpenAIServingChat:
|
||||
OpenAI-style chat completions serving
|
||||
"""
|
||||
|
||||
def __init__(self, engine_client, pid, ips):
|
||||
def __init__(self, engine_client, pid, ips, max_waiting_time):
|
||||
self.engine_client = engine_client
|
||||
self.pid = pid
|
||||
self.master_ip = ips
|
||||
self.max_waiting_time = max_waiting_time
|
||||
self.host_ip = get_host_ip()
|
||||
if self.master_ip is not None:
|
||||
if isinstance(self.master_ip, list):
|
||||
@@ -77,31 +78,45 @@ class OpenAIServingChat:
|
||||
api_server_logger.error(err_msg)
|
||||
return ErrorResponse(message=err_msg, code=400)
|
||||
|
||||
if request.user is not None:
|
||||
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
|
||||
else:
|
||||
request_id = f"chatcmpl-{uuid.uuid4()}"
|
||||
api_server_logger.info(f"create chat completion request: {request_id}")
|
||||
|
||||
try:
|
||||
current_req_dict = request.to_dict_for_infer(request_id)
|
||||
current_req_dict["arrival_time"] = time.time()
|
||||
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
|
||||
if isinstance(prompt_token_ids, np.ndarray):
|
||||
prompt_token_ids = prompt_token_ids.tolist()
|
||||
except Exception as e:
|
||||
return ErrorResponse(code=400, message=str(e))
|
||||
if self.max_waiting_time < 0:
|
||||
await self.engine_client.semaphore.acquire()
|
||||
else:
|
||||
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
|
||||
api_server_logger.debug(f"current waiting request {self.engine_client.semaphore.status()}")
|
||||
|
||||
del current_req_dict
|
||||
|
||||
if request.stream:
|
||||
return self.chat_completion_stream_generator(request, request_id, request.model, prompt_token_ids)
|
||||
else:
|
||||
if request.user is not None:
|
||||
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
|
||||
else:
|
||||
request_id = f"chatcmpl-{uuid.uuid4()}"
|
||||
api_server_logger.info(f"create chat completion request: {request_id}")
|
||||
text_after_process = None
|
||||
try:
|
||||
return await self.chat_completion_full_generator(request, request_id, request.model, prompt_token_ids)
|
||||
current_req_dict = request.to_dict_for_infer(request_id)
|
||||
current_req_dict["arrival_time"] = time.time()
|
||||
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
|
||||
text_after_process = current_req_dict.get("text_after_process")
|
||||
if isinstance(prompt_token_ids, np.ndarray):
|
||||
prompt_token_ids = prompt_token_ids.tolist()
|
||||
except Exception as e:
|
||||
return ErrorResponse(code=400, message=str(e))
|
||||
|
||||
del current_req_dict
|
||||
|
||||
if request.stream:
|
||||
return self.chat_completion_stream_generator(
|
||||
request, request_id, request.model, prompt_token_ids, text_after_process
|
||||
)
|
||||
else:
|
||||
try:
|
||||
return await self.chat_completion_full_generator(
|
||||
request, request_id, request.model, prompt_token_ids, text_after_process
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(code=400, message=str(e))
|
||||
except Exception:
|
||||
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
|
||||
|
||||
def _create_streaming_error_response(self, message: str) -> str:
|
||||
error_response = ErrorResponse(
|
||||
code=400,
|
||||
@@ -115,6 +130,7 @@ class OpenAIServingChat:
|
||||
request_id: str,
|
||||
model_name: str,
|
||||
prompt_token_ids: list(),
|
||||
text_after_process: str,
|
||||
):
|
||||
"""
|
||||
Streaming chat completion generator.
|
||||
@@ -125,6 +141,7 @@ class OpenAIServingChat:
|
||||
previous_num_tokens = 0
|
||||
num_prompt_tokens = 0
|
||||
num_choices = 1
|
||||
tool_called = False
|
||||
max_streaming_response_tokens = (
|
||||
request.max_streaming_response_tokens
|
||||
if request.max_streaming_response_tokens is not None
|
||||
@@ -207,6 +224,7 @@ class OpenAIServingChat:
|
||||
)
|
||||
if request.return_token_ids:
|
||||
choice.delta.prompt_token_ids = list(prompt_token_ids)
|
||||
choice.delta.text_after_process = text_after_process
|
||||
chunk = ChatCompletionStreamResponse(
|
||||
id=request_id,
|
||||
object=chunk_object_type,
|
||||
@@ -222,25 +240,32 @@ class OpenAIServingChat:
|
||||
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens),
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n"
|
||||
api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}")
|
||||
first_iteration = False
|
||||
|
||||
output = res["outputs"]
|
||||
delta_text = output["text"]
|
||||
output_top_logprobs = output["top_logprobs"]
|
||||
previous_num_tokens += len(output["token_ids"])
|
||||
logprobs_res: Optional[LogProbs] = None
|
||||
if request.logprobs and output_top_logprobs is not None:
|
||||
logprobs_res = self._create_chat_logprobs(
|
||||
output_top_logprobs, request.logprobs, request.top_logprobs
|
||||
)
|
||||
if not res["finished"]:
|
||||
if "reasoning_delta_message" in output:
|
||||
delta_message = output["reasoning_delta_message"]
|
||||
elif "tool_delta_message" in output:
|
||||
delta_message = output["tool_delta_message"]
|
||||
if delta_message is not None and delta_message.tool_calls:
|
||||
tool_called = True
|
||||
else:
|
||||
delta_message = DeltaMessage(content=delta_text)
|
||||
else:
|
||||
delta_message = DeltaMessage(content=delta_text)
|
||||
|
||||
previous_num_tokens += len(output["token_ids"])
|
||||
delta_message = DeltaMessage(
|
||||
content=delta_text,
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
prompt_token_ids=None,
|
||||
completion_token_ids=None,
|
||||
tool_calls=output.get("tool_call_content", []),
|
||||
)
|
||||
if delta_message is None:
|
||||
continue
|
||||
|
||||
choice = ChatCompletionResponseStreamChoice(
|
||||
index=0,
|
||||
@@ -248,6 +273,7 @@ class OpenAIServingChat:
|
||||
logprobs=logprobs_res,
|
||||
arrival_time=arrival_time,
|
||||
)
|
||||
|
||||
if res["finished"]:
|
||||
num_choices -= 1
|
||||
work_process_metrics.e2e_request_latency.observe(
|
||||
@@ -257,10 +283,7 @@ class OpenAIServingChat:
|
||||
max_tokens = request.max_completion_tokens or request.max_tokens
|
||||
if has_no_token_limit or previous_num_tokens != max_tokens:
|
||||
choice.finish_reason = "stop"
|
||||
if (
|
||||
self.engine_client.reasoning_parser == "ernie_x1"
|
||||
and output.get("finish_reason", "") == "tool_calls"
|
||||
):
|
||||
if tool_called:
|
||||
choice.finish_reason = "tool_calls"
|
||||
else:
|
||||
choice.finish_reason = "length"
|
||||
@@ -270,6 +293,7 @@ class OpenAIServingChat:
|
||||
|
||||
if request.return_token_ids:
|
||||
choice.delta.completion_token_ids = list(output["token_ids"])
|
||||
choice.delta.raw_prediction = output.get("raw_prediction")
|
||||
if include_continuous_usage:
|
||||
chunk.usage = UsageInfo(
|
||||
prompt_tokens=num_prompt_tokens,
|
||||
@@ -281,6 +305,9 @@ class OpenAIServingChat:
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
# 打印尾包
|
||||
if res["finished"]:
|
||||
api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}")
|
||||
choices = []
|
||||
|
||||
if choices:
|
||||
@@ -310,6 +337,8 @@ class OpenAIServingChat:
|
||||
yield f"data: {error_data}\n\n"
|
||||
finally:
|
||||
dealer.close()
|
||||
self.engine_client.semaphore.release()
|
||||
api_server_logger.info(f"release {self.engine_client.semaphore.status()}")
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
async def chat_completion_full_generator(
|
||||
@@ -318,6 +347,7 @@ class OpenAIServingChat:
|
||||
request_id: str,
|
||||
model_name: str,
|
||||
prompt_token_ids: list(),
|
||||
text_after_process: str,
|
||||
):
|
||||
"""
|
||||
Full chat completion generator.
|
||||
@@ -384,6 +414,8 @@ class OpenAIServingChat:
|
||||
break
|
||||
finally:
|
||||
dealer.close()
|
||||
self.engine_client.semaphore.release()
|
||||
api_server_logger.info(f"release {self.engine_client.semaphore.status()}")
|
||||
|
||||
choices = []
|
||||
output = final_res["outputs"]
|
||||
@@ -391,9 +423,11 @@ class OpenAIServingChat:
|
||||
role="assistant",
|
||||
content=output["text"],
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
tool_calls=output.get("tool_call"),
|
||||
prompt_token_ids=prompt_token_ids if request.return_token_ids else None,
|
||||
completion_token_ids=completion_token_ids if request.return_token_ids else None,
|
||||
text_after_process=text_after_process if request.return_token_ids else None,
|
||||
raw_prediction=output.get("raw_prediction") if request.return_token_ids else None,
|
||||
)
|
||||
logprobs_full_res = None
|
||||
if logprob_contents:
|
||||
@@ -409,7 +443,7 @@ class OpenAIServingChat:
|
||||
max_tokens = request.max_completion_tokens or request.max_tokens
|
||||
if has_no_token_limit or previous_num_tokens != max_tokens:
|
||||
choice.finish_reason = "stop"
|
||||
if self.engine_client.reasoning_parser == "ernie_x1" and output.get("finish_reason", "") == "tool_calls":
|
||||
if output.get("tool_call"):
|
||||
choice.finish_reason = "tool_calls"
|
||||
else:
|
||||
choice.finish_reason = "length"
|
||||
@@ -427,13 +461,15 @@ class OpenAIServingChat:
|
||||
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)),
|
||||
)
|
||||
work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"])
|
||||
return ChatCompletionResponse(
|
||||
res = ChatCompletionResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=choices,
|
||||
usage=usage,
|
||||
)
|
||||
api_server_logger.info(f"Chat response: {res.model_dump_json()}")
|
||||
return res
|
||||
|
||||
def _create_chat_logprobs(
|
||||
self,
|
||||
|
@@ -40,11 +40,12 @@ from fastdeploy.worker.output import LogprobsLists
|
||||
|
||||
|
||||
class OpenAIServingCompletion:
|
||||
def __init__(self, engine_client, pid, ips):
|
||||
def __init__(self, engine_client, pid, ips, max_waiting_time):
|
||||
self.engine_client = engine_client
|
||||
self.pid = pid
|
||||
self.master_ip = ips
|
||||
self.host_ip = get_host_ip()
|
||||
self.max_waiting_time = max_waiting_time
|
||||
if self.master_ip is not None:
|
||||
if isinstance(self.master_ip, list):
|
||||
self.master_ip = self.master_ip[0]
|
||||
@@ -99,6 +100,14 @@ class OpenAIServingCompletion:
|
||||
|
||||
api_server_logger.info(f"start inference for request {num_choices}")
|
||||
prompt_batched_token_ids = []
|
||||
text_after_process_list = []
|
||||
try:
|
||||
if self.max_waiting_time < 0:
|
||||
await self.engine_client.semaphore.acquire()
|
||||
else:
|
||||
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
|
||||
except Exception:
|
||||
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
|
||||
try:
|
||||
for idx, prompt in enumerate(request_prompts):
|
||||
request_id_idx = f"{request_id}-{idx}"
|
||||
@@ -108,6 +117,7 @@ class OpenAIServingCompletion:
|
||||
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
|
||||
if isinstance(prompt_token_ids, np.ndarray):
|
||||
prompt_token_ids = prompt_token_ids.tolist()
|
||||
text_after_process_list.append(current_req_dict.get("text_after_process"))
|
||||
prompt_batched_token_ids.append(prompt_token_ids)
|
||||
except Exception as e:
|
||||
return ErrorResponse(message=str(e), code=400)
|
||||
@@ -122,6 +132,7 @@ class OpenAIServingCompletion:
|
||||
created_time=created_time,
|
||||
model_name=request.model,
|
||||
prompt_batched_token_ids=prompt_batched_token_ids,
|
||||
text_after_process_list=text_after_process_list,
|
||||
)
|
||||
else:
|
||||
try:
|
||||
@@ -132,6 +143,7 @@ class OpenAIServingCompletion:
|
||||
created_time=created_time,
|
||||
model_name=request.model,
|
||||
prompt_batched_token_ids=prompt_batched_token_ids,
|
||||
text_after_process_list=text_after_process_list,
|
||||
)
|
||||
except Exception as e:
|
||||
return ErrorResponse(code=400, message=str(e))
|
||||
@@ -147,6 +159,7 @@ class OpenAIServingCompletion:
|
||||
created_time: int,
|
||||
model_name: str,
|
||||
prompt_batched_token_ids: list(),
|
||||
text_after_process_list: list(),
|
||||
):
|
||||
"""
|
||||
Process the full completion request with multiple choices.
|
||||
@@ -207,8 +220,7 @@ class OpenAIServingCompletion:
|
||||
valid_results[rid] = data
|
||||
num_choices -= 1
|
||||
break
|
||||
|
||||
return self.request_output_to_completion_response(
|
||||
res = self.request_output_to_completion_response(
|
||||
final_res_batch=valid_results,
|
||||
request=request,
|
||||
request_id=request_id,
|
||||
@@ -216,13 +228,26 @@ class OpenAIServingCompletion:
|
||||
model_name=model_name,
|
||||
prompt_batched_token_ids=prompt_batched_token_ids,
|
||||
completion_batched_token_ids=completion_batched_token_ids,
|
||||
text_after_process_list=text_after_process_list,
|
||||
)
|
||||
api_server_logger.info(f"Completion response: {res.model_dump_json()}")
|
||||
return res
|
||||
except Exception as e:
|
||||
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
if dealer is not None:
|
||||
dealer.close()
|
||||
self.engine_client.semaphore.release()
|
||||
|
||||
def calc_finish_reason(self, max_tokens, token_num, output, tool_called):
|
||||
if max_tokens is None or token_num != max_tokens:
|
||||
if tool_called or output.get("tool_call"):
|
||||
return "tool_calls"
|
||||
else:
|
||||
return "stop"
|
||||
else:
|
||||
return "length"
|
||||
|
||||
async def completion_stream_generator(
|
||||
self,
|
||||
@@ -232,6 +257,7 @@ class OpenAIServingCompletion:
|
||||
created_time: int,
|
||||
model_name: str,
|
||||
prompt_batched_token_ids: list(),
|
||||
text_after_process_list: list(),
|
||||
):
|
||||
"""
|
||||
Process the stream completion request.
|
||||
@@ -245,6 +271,7 @@ class OpenAIServingCompletion:
|
||||
output_tokens = [0] * num_choices
|
||||
inference_start_time = [0] * num_choices
|
||||
first_iteration = [True] * num_choices
|
||||
tool_called = False
|
||||
max_streaming_response_tokens = (
|
||||
request.max_streaming_response_tokens
|
||||
if request.max_streaming_response_tokens is not None
|
||||
@@ -290,11 +317,15 @@ class OpenAIServingCompletion:
|
||||
index=idx,
|
||||
text="",
|
||||
prompt_token_ids=list(prompt_batched_token_ids[idx]),
|
||||
text_after_process=text_after_process_list[idx],
|
||||
completion_token_ids=None,
|
||||
)
|
||||
],
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
api_server_logger.info(
|
||||
f"Completion Streaming response send_idx 0: {chunk.model_dump_json()}"
|
||||
)
|
||||
first_iteration[idx] = False
|
||||
|
||||
self.engine_client.data_processor.process_response_dict(
|
||||
@@ -312,30 +343,64 @@ class OpenAIServingCompletion:
|
||||
if request.logprobs and output_top_logprobs is not None:
|
||||
logprobs_res = self._create_completion_logprobs(output_top_logprobs, request.logprobs, 0)
|
||||
|
||||
choices.append(
|
||||
CompletionResponseStreamChoice(
|
||||
index=idx,
|
||||
text=output["text"],
|
||||
prompt_token_ids=None,
|
||||
completion_token_ids=output.get("token_ids") if request.return_token_ids else None,
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
arrival_time=arrival_time,
|
||||
logprobs=logprobs_res,
|
||||
)
|
||||
)
|
||||
if res["finished"]:
|
||||
if request.max_tokens is None or output_tokens[idx] + 1 != request.max_tokens:
|
||||
chunk.choices[0].finish_reason = "stop"
|
||||
if (
|
||||
self.engine_client.reasoning_parser == "ernie_x1"
|
||||
and output.get("finish_reason", "") == "tool_calls"
|
||||
):
|
||||
chunk.choices[0].finish_reason = "tool_calls"
|
||||
else:
|
||||
chunk.choices[0].finish_reason = "length"
|
||||
|
||||
output_tokens[idx] += 1
|
||||
base_kwargs = {
|
||||
"index": idx,
|
||||
"completion_token_ids": output.get("token_ids") if request.return_token_ids else None,
|
||||
"arrival_time": arrival_time,
|
||||
"logprobs": logprobs_res,
|
||||
}
|
||||
delta_message_kwargs = None
|
||||
if not res["finished"]:
|
||||
if "reasoning_delta_message" in output:
|
||||
reasoning_delta_message = output["reasoning_delta_message"]
|
||||
if reasoning_delta_message is not None:
|
||||
delta_message_kwargs = {
|
||||
**base_kwargs,
|
||||
"text": reasoning_delta_message.content or "",
|
||||
"reasoning_content": reasoning_delta_message.reasoning_content,
|
||||
}
|
||||
elif "tool_delta_message" in output:
|
||||
tool_delta_message = output["tool_delta_message"]
|
||||
if tool_delta_message is not None:
|
||||
delta_message_kwargs = {
|
||||
**base_kwargs,
|
||||
"text": tool_delta_message.content or "",
|
||||
"tool_calls": tool_delta_message.tool_calls,
|
||||
}
|
||||
if tool_delta_message.tool_calls:
|
||||
tool_called = True
|
||||
else:
|
||||
delta_message_kwargs = {
|
||||
**base_kwargs,
|
||||
"text": output["text"],
|
||||
}
|
||||
else:
|
||||
delta_message_kwargs = {
|
||||
**base_kwargs,
|
||||
"text": output["text"],
|
||||
}
|
||||
|
||||
if delta_message_kwargs is None:
|
||||
continue
|
||||
delta_message = CompletionResponseStreamChoice(**delta_message_kwargs)
|
||||
|
||||
choices.append(delta_message)
|
||||
output_tokens[idx] += 1
|
||||
|
||||
if res["finished"]:
|
||||
choices[-1].finish_reason = self.calc_finish_reason(
|
||||
request.max_tokens, output_tokens[idx], output, tool_called
|
||||
)
|
||||
send_idx = output.get("send_idx")
|
||||
# 只有当 send_idx 明确为 0 时才记录日志
|
||||
if send_idx == 0 and not request.return_token_ids:
|
||||
chunk_temp = chunk
|
||||
chunk_temp.choices = choices
|
||||
api_server_logger.info(
|
||||
f"Completion Streaming response send_idx 0: {chunk_temp.model_dump_json()}"
|
||||
)
|
||||
del chunk_temp
|
||||
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
chunk = CompletionStreamResponse(
|
||||
@@ -361,6 +426,7 @@ class OpenAIServingCompletion:
|
||||
),
|
||||
)
|
||||
yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}")
|
||||
if choices:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
@@ -372,6 +438,7 @@ class OpenAIServingCompletion:
|
||||
del request
|
||||
if dealer is not None:
|
||||
dealer.close()
|
||||
self.engine_client.semaphore.release()
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
def request_output_to_completion_response(
|
||||
@@ -383,6 +450,7 @@ class OpenAIServingCompletion:
|
||||
model_name: str,
|
||||
prompt_batched_token_ids: list(),
|
||||
completion_batched_token_ids: list(),
|
||||
text_after_process_list: list(),
|
||||
) -> CompletionResponse:
|
||||
choices: List[CompletionResponseChoice] = []
|
||||
num_prompt_tokens = 0
|
||||
@@ -421,16 +489,20 @@ class OpenAIServingCompletion:
|
||||
token_ids = output["token_ids"]
|
||||
output_text = output["text"]
|
||||
|
||||
finish_reason = self.calc_finish_reason(request.max_tokens, final_res["output_token_ids"], output, False)
|
||||
|
||||
choice_data = CompletionResponseChoice(
|
||||
token_ids=token_ids,
|
||||
index=len(choices),
|
||||
text=output_text,
|
||||
prompt_token_ids=prompt_token_ids if request.return_token_ids else None,
|
||||
completion_token_ids=completion_token_ids if request.return_token_ids else None,
|
||||
raw_prediction=output.get("raw_prediction") if request.return_token_ids else None,
|
||||
text_after_process=text_after_process_list[idx] if request.return_token_ids else None,
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
tool_calls=output.get("tool_call"),
|
||||
logprobs=aggregated_logprobs,
|
||||
finish_reason=None,
|
||||
finish_reason=finish_reason,
|
||||
)
|
||||
choices.append(choice_data)
|
||||
|
||||
|
24
fastdeploy/entrypoints/openai/tool_parsers/__init__.py
Normal file
24
fastdeploy/entrypoints/openai/tool_parsers/__init__.py
Normal file
@@ -0,0 +1,24 @@
|
||||
"""
|
||||
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
from .abstract_tool_parser import ToolParser, ToolParserManager
|
||||
from .ernie_x1_tool_parser import ErnieX1ToolParser
|
||||
|
||||
__all__ = [
|
||||
"ToolParser",
|
||||
"ToolParserManager",
|
||||
"ErnieX1ToolParser",
|
||||
]
|
@@ -0,0 +1,159 @@
|
||||
"""
|
||||
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
import os
|
||||
from collections.abc import Sequence
|
||||
from functools import cached_property
|
||||
from typing import Callable, Optional, Union
|
||||
|
||||
from fastdeploy.entrypoints.openai.protocol import (
|
||||
ChatCompletionRequest,
|
||||
DeltaMessage,
|
||||
ExtractedToolCallInformation,
|
||||
)
|
||||
from fastdeploy.utils import data_processor_logger, import_from_path, is_list_of
|
||||
|
||||
|
||||
class ToolParser:
|
||||
"""
|
||||
Abstract ToolParser class that should not be used directly. Provided
|
||||
properties and methods should be used in
|
||||
derived classes.
|
||||
"""
|
||||
|
||||
def __init__(self, tokenizer):
|
||||
self.prev_tool_call_arr: list[dict] = []
|
||||
# the index of the tool call that is currently being parsed
|
||||
self.current_tool_id: int = -1
|
||||
self.current_tool_name_sent: bool = False
|
||||
self.streamed_args_for_tool: list[str] = []
|
||||
|
||||
self.model_tokenizer = tokenizer
|
||||
|
||||
@cached_property
|
||||
def vocab(self) -> dict[str, int]:
|
||||
# NOTE: Only PreTrainedTokenizerFast is guaranteed to have .vocab
|
||||
# whereas all tokenizers have .get_vocab()
|
||||
return self.model_tokenizer.get_vocab()
|
||||
|
||||
def adjust_request(self, request: ChatCompletionRequest) -> ChatCompletionRequest:
|
||||
"""
|
||||
Static method that used to adjust the request parameters.
|
||||
"""
|
||||
return request
|
||||
|
||||
def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest) -> ExtractedToolCallInformation:
|
||||
"""
|
||||
Static method that should be implemented for extracting tool calls from
|
||||
a complete model-generated string.
|
||||
Used for non-streaming responses where we have the entire model response
|
||||
available before sending to the client.
|
||||
Static because it's stateless.
|
||||
"""
|
||||
raise NotImplementedError("AbstractToolParser.extract_tool_calls has not been implemented!")
|
||||
|
||||
def extract_tool_calls_streaming(
|
||||
self,
|
||||
previous_text: str,
|
||||
current_text: str,
|
||||
delta_text: str,
|
||||
previous_token_ids: Sequence[int],
|
||||
current_token_ids: Sequence[int],
|
||||
delta_token_ids: Sequence[int],
|
||||
request: ChatCompletionRequest,
|
||||
) -> Union[DeltaMessage, None]:
|
||||
"""
|
||||
Instance method that should be implemented for extracting tool calls
|
||||
from an incomplete response; for use when handling tool calls and
|
||||
streaming. Has to be an instance method because it requires state -
|
||||
the current tokens/diffs, but also the information about what has
|
||||
previously been parsed and extracted (see constructor)
|
||||
"""
|
||||
raise NotImplementedError("AbstractToolParser.extract_tool_calls_streaming has not been " "implemented!")
|
||||
|
||||
|
||||
class ToolParserManager:
|
||||
tool_parsers: dict[str, type] = {}
|
||||
|
||||
@classmethod
|
||||
def get_tool_parser(cls, name) -> type:
|
||||
"""
|
||||
Get tool parser by name which is registered by `register_module`.
|
||||
|
||||
Raise a KeyError exception if the name is not registered.
|
||||
"""
|
||||
if name in cls.tool_parsers:
|
||||
return cls.tool_parsers[name]
|
||||
|
||||
raise KeyError(f"tool helper: '{name}' not found in tool_parsers")
|
||||
|
||||
@classmethod
|
||||
def _register_module(
|
||||
cls, module: type, module_name: Optional[Union[str, list[str]]] = None, force: bool = True
|
||||
) -> None:
|
||||
if not issubclass(module, ToolParser):
|
||||
raise TypeError(f"module must be subclass of ToolParser, but got {type(module)}")
|
||||
if module_name is None:
|
||||
module_name = module.__name__
|
||||
if isinstance(module_name, str):
|
||||
module_name = [module_name]
|
||||
for name in module_name:
|
||||
if not force and name in cls.tool_parsers:
|
||||
existed_module = cls.tool_parsers[name]
|
||||
raise KeyError(f"{name} is already registered " f"at {existed_module.__module__}")
|
||||
cls.tool_parsers[name] = module
|
||||
|
||||
@classmethod
|
||||
def register_module(
|
||||
cls, name: Optional[Union[str, list[str]]] = None, force: bool = True, module: Union[type, None] = None
|
||||
) -> Union[type, Callable]:
|
||||
"""
|
||||
Register module with the given name or name list. it can be used as a
|
||||
decoder(with module as None) or normal function(with module as not
|
||||
None).
|
||||
"""
|
||||
if not isinstance(force, bool):
|
||||
raise TypeError(f"force must be a boolean, but got {type(force)}")
|
||||
|
||||
# raise the error ahead of time
|
||||
if not (name is None or isinstance(name, str) or is_list_of(name, str)):
|
||||
raise TypeError("name must be None, an instance of str, or a sequence of str, " f"but got {type(name)}")
|
||||
|
||||
# use it as a normal method: x.register_module(module=SomeClass)
|
||||
if module is not None:
|
||||
cls._register_module(module=module, module_name=name, force=force)
|
||||
return module
|
||||
|
||||
# use it as a decorator: @x.register_module()
|
||||
def _register(module):
|
||||
cls._register_module(module=module, module_name=name, force=force)
|
||||
return module
|
||||
|
||||
return _register
|
||||
|
||||
@classmethod
|
||||
def import_tool_parser(cls, plugin_path: str) -> None:
|
||||
"""
|
||||
Import a user-defined tool parser by the path of the tool parser define
|
||||
file.
|
||||
"""
|
||||
module_name = os.path.splitext(os.path.basename(plugin_path))[0]
|
||||
|
||||
try:
|
||||
import_from_path(module_name, plugin_path)
|
||||
except Exception:
|
||||
data_processor_logger.exception("Failed to load module '%s' from %s.", module_name, plugin_path)
|
||||
return
|
@@ -0,0 +1,356 @@
|
||||
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import re
|
||||
import uuid
|
||||
from collections.abc import Sequence
|
||||
from typing import Union
|
||||
|
||||
import partial_json_parser
|
||||
|
||||
|
||||
def random_tool_call_id() -> str:
|
||||
"""Generate a random tool call ID"""
|
||||
return f"chatcmpl-tool-{str(uuid.uuid4().hex)}"
|
||||
|
||||
|
||||
from fastdeploy.entrypoints.openai.protocol import (
|
||||
ChatCompletionRequest,
|
||||
DeltaFunctionCall,
|
||||
DeltaMessage,
|
||||
DeltaToolCall,
|
||||
ExtractedToolCallInformation,
|
||||
FunctionCall,
|
||||
ToolCall,
|
||||
)
|
||||
from fastdeploy.entrypoints.openai.tool_parsers.abstract_tool_parser import (
|
||||
ToolParser,
|
||||
ToolParserManager,
|
||||
)
|
||||
from fastdeploy.utils import data_processor_logger
|
||||
|
||||
|
||||
@ToolParserManager.register_module("ernie_x1")
|
||||
class ErnieX1ToolParser(ToolParser):
|
||||
"""
|
||||
Tool parser for Ernie model version 4.5.1.
|
||||
This parser handles tool calls with newline formats.
|
||||
"""
|
||||
|
||||
def __init__(self, tokenizer):
|
||||
super().__init__(tokenizer)
|
||||
|
||||
self.prev_tool_call_arr: list[dict] = []
|
||||
self.current_tool_id: int = -1
|
||||
self.current_tool_name_sent: bool = False
|
||||
self.streamed_args_for_tool: list[str] = [] # map what has been streamed for each tool so far to a list
|
||||
self.buffer: str = "" # buffer for accumulating unprocessed streaming content
|
||||
self.bracket_counts: dict = {"total_l": 0, "total_r": 0} # track bracket counts in streamed deltas
|
||||
self.tool_call_start_token: str = "<tool_call>"
|
||||
self.tool_call_end_token: str = "</tool_call>"
|
||||
|
||||
self.tool_call_start_token_id = self.vocab.get(self.tool_call_start_token)
|
||||
self.tool_call_end_token_id = self.vocab.get(self.tool_call_end_token)
|
||||
if self.tool_call_start_token_id is None or self.tool_call_end_token_id is None:
|
||||
raise RuntimeError(
|
||||
"Hermes 2 Pro Tool parser could not locate tool call start/end " "tokens in the tokenizer!"
|
||||
)
|
||||
|
||||
if not self.model_tokenizer:
|
||||
raise ValueError(
|
||||
"The model tokenizer must be passed to the ToolCallParser constructor during construction."
|
||||
)
|
||||
|
||||
def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest) -> ExtractedToolCallInformation:
|
||||
"""
|
||||
Extract the tool calls from a complete model response.
|
||||
Supports XML-style formats with newlines:
|
||||
- XML format: <think>\n...\n</think>\n\n\n<tool_call>\n{...}\n</tool_call>\n...
|
||||
|
||||
Handles boundary cases:
|
||||
1. Only name and partial arguments: {"name": "get_weather", "arguments": {"location": "北京"
|
||||
2. Only partial name: {"name": "get_we
|
||||
3. Only name and arguments field without content: {"name": "get_weather", "argume
|
||||
"""
|
||||
|
||||
try:
|
||||
tool_calls = []
|
||||
|
||||
# Check for invalid <response> tags before tool calls
|
||||
if re.search(r"<response>[\s\S]*?</response>\s*(?=<tool_call>)", model_output):
|
||||
data_processor_logger.error("Invalid format: <response> tags found before <tool_call>")
|
||||
return ExtractedToolCallInformation(tools_called=False, content=model_output)
|
||||
|
||||
function_call_arr = []
|
||||
remaining_text = model_output
|
||||
|
||||
while True:
|
||||
# 查找下一个tool_call块
|
||||
tool_call_pos = remaining_text.find("<tool_call>")
|
||||
if tool_call_pos == -1:
|
||||
break
|
||||
|
||||
# 提取tool_call开始位置后的内容
|
||||
tool_content_start = tool_call_pos + len("<tool_call>")
|
||||
tool_content_end = remaining_text.find("</tool_call>", tool_content_start)
|
||||
|
||||
tool_json = ""
|
||||
if tool_content_end == -1:
|
||||
# 处理未闭合的tool_call块(截断情况)
|
||||
tool_json = remaining_text[tool_content_start:].strip()
|
||||
remaining_text = "" # 没有更多内容需要处理
|
||||
else:
|
||||
# 处理完整的tool_call块
|
||||
tool_json = remaining_text[tool_content_start:tool_content_end].strip()
|
||||
remaining_text = remaining_text[tool_content_end + len("</tool_call>") :]
|
||||
|
||||
if not tool_json:
|
||||
continue
|
||||
|
||||
# 处理JSON内容
|
||||
tool_json = tool_json.strip()
|
||||
if not tool_json.startswith("{"):
|
||||
tool_json = "{" + tool_json
|
||||
if not tool_json.endswith("}"):
|
||||
tool_json = tool_json + "}"
|
||||
|
||||
try:
|
||||
# 首先尝试标准JSON解析
|
||||
try:
|
||||
tool_data = json.loads(tool_json)
|
||||
|
||||
if isinstance(tool_data, dict) and "name" in tool_data and "arguments" in tool_data:
|
||||
function_call_arr.append(
|
||||
{
|
||||
"name": tool_data["name"],
|
||||
"arguments": tool_data["arguments"],
|
||||
"_is_complete": True, # 明确标记为完整解析
|
||||
}
|
||||
)
|
||||
continue
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# 标准解析失败时尝试partial_json_parser
|
||||
from partial_json_parser.core.options import Allow
|
||||
|
||||
try:
|
||||
tool_data = {}
|
||||
flags = Allow.ALL & ~Allow.STR
|
||||
|
||||
# 解析name字段
|
||||
name_match = re.search(r'"name"\s*:\s*"([^"]*)"', tool_json)
|
||||
if name_match:
|
||||
tool_data["name"] = name_match.group(1)
|
||||
|
||||
# 解析arguments字段
|
||||
args_match = re.search(r'"arguments"\s*:\s*(\{.*)', tool_json)
|
||||
if args_match:
|
||||
try:
|
||||
tool_data["arguments"] = partial_json_parser.loads(args_match.group(1), flags=flags)
|
||||
except:
|
||||
tool_data["arguments"] = None
|
||||
|
||||
if isinstance(tool_data, dict):
|
||||
function_call_arr.append(
|
||||
{
|
||||
"name": tool_data.get("name", ""),
|
||||
"arguments": tool_data.get("arguments", {}),
|
||||
"_is_partial": True, # 标记为部分解析
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
data_processor_logger.debug(f"Failed to parse tool call: {str(e)}")
|
||||
continue
|
||||
except Exception as e:
|
||||
data_processor_logger.debug(f"Failed to parse tool call: {str(e)}")
|
||||
continue
|
||||
|
||||
if not function_call_arr:
|
||||
data_processor_logger.error("No valid tool calls found")
|
||||
return ExtractedToolCallInformation(tools_called=False, content=model_output)
|
||||
|
||||
tool_calls = []
|
||||
all_complete = True # 初始设为True,只要有一个不完整就变为False
|
||||
|
||||
for tool_call in function_call_arr:
|
||||
# 记录工具调用解析状态
|
||||
is_complete = tool_call.get("_is_complete", False)
|
||||
is_partial = tool_call.get("_is_partial", False)
|
||||
|
||||
# 只要有一个不完整就认为整体不完整
|
||||
if not is_complete or is_partial:
|
||||
all_complete = False
|
||||
|
||||
# 处理参数序列化
|
||||
tool_args = tool_call.get("arguments", {})
|
||||
if not isinstance(tool_args, dict):
|
||||
tool_args = {}
|
||||
|
||||
try:
|
||||
args_str = json.dumps(tool_args, ensure_ascii=False) if tool_args else "{}"
|
||||
except:
|
||||
args_str = "{}"
|
||||
|
||||
tool_calls.append(
|
||||
ToolCall(
|
||||
type="function",
|
||||
id=random_tool_call_id(),
|
||||
function=FunctionCall(
|
||||
name=tool_call.get("name", ""),
|
||||
arguments=args_str,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
# 只有当所有工具调用都明确标记为complete时才返回tools_called=True
|
||||
return ExtractedToolCallInformation(
|
||||
tools_called=all_complete, tool_calls=tool_calls if tool_calls else None, content=""
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
data_processor_logger.error(f"Error in extracting tool call from response: {str(e)}")
|
||||
return ExtractedToolCallInformation(tools_called=False, tool_calls=None, content=model_output)
|
||||
|
||||
def extract_tool_calls_streaming(
|
||||
self,
|
||||
previous_text: str,
|
||||
current_text: str,
|
||||
delta_text: str,
|
||||
previous_token_ids: Sequence[int],
|
||||
current_token_ids: Sequence[int],
|
||||
delta_token_ids: Sequence[int],
|
||||
request: dict,
|
||||
) -> Union[DeltaMessage, None]:
|
||||
|
||||
if self.tool_call_start_token_id not in current_token_ids:
|
||||
return DeltaMessage(content=delta_text)
|
||||
# 忽略空chunk
|
||||
if len(delta_text.strip()) == 0:
|
||||
return None
|
||||
|
||||
try:
|
||||
delta = None
|
||||
# 使用buffer累积delta_text内容
|
||||
self.buffer += delta_text
|
||||
|
||||
# 处理增量中的新tool_call开始
|
||||
if "<tool_call>" in delta_text:
|
||||
self.current_tool_id = (
|
||||
max(self.current_tool_id, 0) if self.current_tool_id == -1 else self.current_tool_id + 1
|
||||
)
|
||||
self.current_tool_name_sent = False
|
||||
if len(self.streamed_args_for_tool) <= self.current_tool_id:
|
||||
self.streamed_args_for_tool.append("")
|
||||
data_processor_logger.debug(f"New tool call started with ID: {self.current_tool_id}")
|
||||
|
||||
# 1. 尝试解析name字段
|
||||
if not self.current_tool_name_sent and '"name"' in self.buffer:
|
||||
name_match = re.search(r'"name"\s*:\s*"([^"]*)"', self.buffer)
|
||||
if name_match:
|
||||
name = name_match.group(1)
|
||||
if name:
|
||||
delta = DeltaMessage(
|
||||
tool_calls=[
|
||||
DeltaToolCall(
|
||||
index=self.current_tool_id,
|
||||
type="function",
|
||||
id=random_tool_call_id(),
|
||||
function=DeltaFunctionCall(name=name).model_dump(exclude_none=True),
|
||||
)
|
||||
]
|
||||
)
|
||||
print("delta name:", delta)
|
||||
# 删除已处理的name部分
|
||||
self.buffer = self.buffer[name_match.end() :]
|
||||
self.current_tool_name_sent = True
|
||||
return delta
|
||||
# 2. 尝试解析arguments字段
|
||||
if '"arguments"' in self.buffer:
|
||||
args_match = re.search(r'"arguments"\s*:\s*(\{.*)', self.buffer)
|
||||
if args_match:
|
||||
args_content = args_match.group(1)
|
||||
print("args_content:", args_content)
|
||||
try:
|
||||
# 检查是否到达arguments结尾(括号完全匹配)
|
||||
if "}}" in args_content:
|
||||
print("delta_text (partial):", delta_text)
|
||||
# 逐个字符检查括号匹配状态
|
||||
matched_pos = -1
|
||||
for i, ch in enumerate(delta_text):
|
||||
if ch == "{":
|
||||
self.bracket_counts["total_l"] += 1
|
||||
elif ch == "}":
|
||||
self.bracket_counts["total_r"] += 1
|
||||
|
||||
if self.bracket_counts["total_l"] == self.bracket_counts["total_r"]: # 括号完全匹配
|
||||
matched_pos = i
|
||||
break
|
||||
|
||||
if matched_pos >= 0:
|
||||
# 找到匹配点,清理buffer并返回
|
||||
truncate_text = delta_text[: matched_pos + 1]
|
||||
print("truncate_text:", truncate_text)
|
||||
delta = DeltaMessage(
|
||||
tool_calls=[
|
||||
DeltaToolCall(
|
||||
index=self.current_tool_id,
|
||||
function=DeltaFunctionCall(arguments=truncate_text).model_dump(
|
||||
exclude_none=True
|
||||
),
|
||||
)
|
||||
]
|
||||
)
|
||||
self.buffer = self.buffer[args_match.end() :]
|
||||
print(delta)
|
||||
return delta
|
||||
else:
|
||||
# 没有完全匹配,继续累积
|
||||
return None
|
||||
else:
|
||||
# 增量返回当前可解析的部分
|
||||
for ch in delta_text:
|
||||
if ch == "{":
|
||||
self.bracket_counts["total_l"] += 1
|
||||
elif ch == "}":
|
||||
self.bracket_counts["total_r"] += 1
|
||||
delta = DeltaMessage(
|
||||
tool_calls=[
|
||||
DeltaToolCall(
|
||||
index=self.current_tool_id,
|
||||
function=DeltaFunctionCall(arguments=delta_text).model_dump(exclude_none=True),
|
||||
)
|
||||
]
|
||||
)
|
||||
print("delta argument (partial):", delta)
|
||||
print(
|
||||
f"Current bracket counts - left: {self.bracket_counts['total_l']}, right: {self.bracket_counts['total_r']}"
|
||||
)
|
||||
return delta
|
||||
except Exception as e:
|
||||
data_processor_logger.error(f"Error in streaming tool call extraction: {str(e)}")
|
||||
return None
|
||||
if "</tool_call>" in self.buffer:
|
||||
end_pos = self.buffer.find("</tool_call>")
|
||||
self.buffer = self.buffer[end_pos + len("</tool_call>") :]
|
||||
|
||||
# 完成当前工具调用处理
|
||||
self.streamed_args_for_tool.append("")
|
||||
|
||||
return delta
|
||||
|
||||
except Exception as e:
|
||||
data_processor_logger.error(f"Error in streaming tool call extraction: {str(e)}")
|
||||
return None
|
137
fastdeploy/entrypoints/openai/tool_parsers/utils.py
Normal file
137
fastdeploy/entrypoints/openai/tool_parsers/utils.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""
|
||||
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
import json
|
||||
from json import JSONDecodeError, JSONDecoder
|
||||
from typing import Any
|
||||
|
||||
import partial_json_parser
|
||||
from partial_json_parser.core.options import Allow
|
||||
|
||||
|
||||
def find_common_prefix(s1: str, s2: str) -> str:
|
||||
"""
|
||||
Finds a common prefix that is shared between two strings, if there is one.
|
||||
Order of arguments is NOT important.
|
||||
|
||||
This function is provided as a UTILITY for extracting information from JSON
|
||||
generated by partial_json_parser, to help in ensuring that the right tokens
|
||||
are returned in streaming, so that close-quotes, close-brackets and
|
||||
close-braces are not returned prematurely.
|
||||
|
||||
e.g. find_common_prefix('{"fruit": "ap"}', '{"fruit": "apple"}') ->
|
||||
'{"fruit": "ap'
|
||||
"""
|
||||
prefix = ""
|
||||
min_length = min(len(s1), len(s2))
|
||||
for i in range(0, min_length):
|
||||
if s1[i] == s2[i]:
|
||||
prefix += s1[i]
|
||||
else:
|
||||
break
|
||||
return prefix
|
||||
|
||||
|
||||
def find_common_suffix(s1: str, s2: str) -> str:
|
||||
"""
|
||||
Finds a common suffix shared between two strings, if there is one. Order of
|
||||
arguments is NOT important.
|
||||
Stops when the suffix ends OR it hits an alphanumeric character
|
||||
|
||||
e.g. find_common_suffix('{"fruit": "ap"}', '{"fruit": "apple"}') -> '"}'
|
||||
"""
|
||||
suffix = ""
|
||||
min_length = min(len(s1), len(s2))
|
||||
for i in range(1, min_length + 1):
|
||||
if s1[-i] == s2[-i] and not s1[-i].isalnum():
|
||||
suffix = s1[-i] + suffix
|
||||
else:
|
||||
break
|
||||
return suffix
|
||||
|
||||
|
||||
def extract_intermediate_diff(curr: str, old: str) -> str:
|
||||
"""
|
||||
Given two strings, extract the difference in the middle between two strings
|
||||
that are known to have a common prefix and/or suffix.
|
||||
|
||||
This function is provided as a UTILITY for extracting information from JSON
|
||||
generated by partial_json_parser, to help in ensuring that the right tokens
|
||||
are returned in streaming, so that close-quotes, close-brackets and
|
||||
close-braces are not returned prematurely. The order of arguments IS
|
||||
important - the new version of the partially-parsed JSON must be the first
|
||||
argument, and the secnod argument must be from the previous generation.
|
||||
|
||||
What it returns, is tokens that should be streamed to the client.
|
||||
|
||||
e.g. extract_intermediate_diff('{"fruit": "apple"}', '{"fruit": "ap"}')
|
||||
-> 'ple'
|
||||
|
||||
"""
|
||||
suffix = find_common_suffix(curr, old)
|
||||
|
||||
old = old[::-1].replace(suffix[::-1], "", 1)[::-1]
|
||||
prefix = find_common_prefix(curr, old)
|
||||
diff = curr
|
||||
if len(suffix):
|
||||
diff = diff[::-1].replace(suffix[::-1], "", 1)[::-1]
|
||||
|
||||
if len(prefix):
|
||||
# replace the prefix only once in case it's mirrored
|
||||
diff = diff.replace(prefix, "", 1)
|
||||
|
||||
return diff
|
||||
|
||||
|
||||
def find_all_indices(string: str, substring: str) -> list[int]:
|
||||
"""
|
||||
Find all (starting) indices of a substring in a given string. Useful for
|
||||
tool call extraction
|
||||
"""
|
||||
indices = []
|
||||
index = -1
|
||||
while True:
|
||||
index = string.find(substring, index + 1)
|
||||
if index == -1:
|
||||
break
|
||||
indices.append(index)
|
||||
return indices
|
||||
|
||||
|
||||
# partial_json_parser doesn't support extra data and
|
||||
# JSONDecoder.raw_decode doesn't support partial JSON
|
||||
def partial_json_loads(input_str: str, flags: Allow) -> tuple[Any, int]:
|
||||
try:
|
||||
return (partial_json_parser.loads(input_str, flags), len(input_str))
|
||||
except JSONDecodeError as e:
|
||||
if "Extra data" in e.msg:
|
||||
dec = JSONDecoder()
|
||||
return dec.raw_decode(input_str)
|
||||
raise
|
||||
|
||||
|
||||
def is_complete_json(input_str: str) -> bool:
|
||||
try:
|
||||
json.loads(input_str)
|
||||
return True
|
||||
except JSONDecodeError:
|
||||
return False
|
||||
|
||||
|
||||
def consume_space(i: int, s: str) -> int:
|
||||
while i < len(s) and s[i].isspace():
|
||||
i += 1
|
||||
return i
|
@@ -80,6 +80,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
"EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
|
||||
# enable kv cache block scheduler v1 (no need for kv_cache_ratio)
|
||||
"ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")),
|
||||
# set trace attribute job_id.
|
||||
"FD_JOB_ID": lambda: os.getenv("FD_JOB_ID"),
|
||||
# support max connections
|
||||
"FD_SUPPORT_MAX_CONNECTIONS": lambda: 768,
|
||||
}
|
||||
|
||||
|
||||
|
@@ -43,14 +43,16 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
pad_token_id (int): 存储填充符号的token ID。
|
||||
"""
|
||||
|
||||
def __init__(self, model_name_or_path, reasoning_parser_obj=None):
|
||||
def __init__(self, model_name_or_path, reasoning_parser_obj=None, tool_parser_obj=None):
|
||||
|
||||
self.model_name_or_path = model_name_or_path
|
||||
data_processor_logger.info(f"model_name_or_path: {model_name_or_path}")
|
||||
self._init_config()
|
||||
|
||||
self.decode_status = dict()
|
||||
self.tool_parser_dict = dict()
|
||||
self.thinking_parser_dict = dict()
|
||||
self.reasoning_end_dict = dict()
|
||||
self._load_tokenizer()
|
||||
data_processor_logger.info(
|
||||
f"tokenizer information: bos_token is {self.tokenizer.bos_token} \
|
||||
@@ -63,6 +65,7 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
self.reasoning_parser = None
|
||||
if reasoning_parser_obj:
|
||||
self.reasoning_parser = reasoning_parser_obj(self.tokenizer)
|
||||
self.tool_parser_obj = tool_parser_obj
|
||||
|
||||
def _init_config(self):
|
||||
self.use_hf_tokenizer = int(envs.FD_USE_HF_TOKENIZER) == 1
|
||||
@@ -156,7 +159,7 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
if request.get("prompt"):
|
||||
prompt = request.get("prompt")
|
||||
prompt = prompt[0] if isinstance(prompt, list) else prompt
|
||||
|
||||
request["text_after_process"] = prompt
|
||||
tokens = self.tokenizer.tokenize(prompt)
|
||||
token_ids = self.tokenizer.convert_tokens_to_ids(tokens)
|
||||
request["prompt_token_ids"] = token_ids
|
||||
@@ -204,6 +207,12 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
response_dict.outputs.reasoning_content = reasoning_content
|
||||
else:
|
||||
response_dict.outputs.text = full_text
|
||||
if self.tool_parser_obj:
|
||||
tool_parser = self.tool_parser_obj(self.tokenizer)
|
||||
tool_call_info = tool_parser.extract_tool_calls(full_text, response_dict)
|
||||
if tool_call_info.tools_called:
|
||||
response_dict.outputs.tool_calls = tool_call_info.tool_calls
|
||||
response_dict.outputs.text = tool_call_info.content
|
||||
data_processor_logger.info(f"req_id:{req_id}, token)ids: {token_ids}")
|
||||
if response_dict.outputs.text == "" and response_dict.outputs.reasoning_content == "":
|
||||
return None
|
||||
@@ -244,12 +253,21 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
delta_text, _, previous_texts = self.ids2tokens(token_ids, req_id)
|
||||
if is_end:
|
||||
full_text = previous_texts + delta_text
|
||||
if enable_thinking and self.reasoning_parser:
|
||||
if self.reasoning_parser and (
|
||||
enable_thinking or self.reasoning_parser.__class__.__name__ == "ErnieX1ReasoningParser"
|
||||
):
|
||||
reasoning_content, text = self.reasoning_parser.extract_reasoning_content(full_text, response_dict)
|
||||
response_dict["outputs"]["text"] = text
|
||||
response_dict["outputs"]["reasoning_content"] = reasoning_content
|
||||
else:
|
||||
response_dict["outputs"]["text"] = full_text
|
||||
if self.tool_parser_obj:
|
||||
tool_parser = self.tool_parser_obj(self.tokenizer)
|
||||
tool_call_info = tool_parser.extract_tool_calls(full_text, response_dict)
|
||||
if tool_call_info.tools_called:
|
||||
response_dict["outputs"]["tool_call"] = tool_call_info.tool_calls
|
||||
response_dict["outputs"]["text"] = tool_call_info.content
|
||||
response_dict["outputs"]["raw_prediction"] = full_text
|
||||
data_processor_logger.info(f"req_id:{req_id}, decode_status: {self.decode_status[req_id]}")
|
||||
del self.decode_status[req_id]
|
||||
return response_dict
|
||||
@@ -273,8 +291,13 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
if token_ids[-1] == self.tokenizer.eos_token_id:
|
||||
token_ids = token_ids[:-1]
|
||||
delta_text, previous_token_ids, previous_texts = self.ids2tokens(token_ids, req_id)
|
||||
if enable_thinking and self.reasoning_parser:
|
||||
reasoning_content, text = self.reasoning_parser.extract_reasoning_content_streaming(
|
||||
response_dict["outputs"]["raw_prediction"] = delta_text
|
||||
if (
|
||||
self.reasoning_parser
|
||||
and req_id not in self.reasoning_end_dict
|
||||
and (enable_thinking or self.reasoning_parser.__class__.__name__ == "ErnieX1ReasoningParser")
|
||||
):
|
||||
reasoning_delta_message = self.reasoning_parser.extract_reasoning_content_streaming(
|
||||
previous_texts,
|
||||
previous_texts + delta_text,
|
||||
delta_text,
|
||||
@@ -282,13 +305,31 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
previous_token_ids + token_ids,
|
||||
token_ids,
|
||||
)
|
||||
response_dict["outputs"]["text"] = text
|
||||
response_dict["outputs"]["reasoning_content"] = reasoning_content
|
||||
else:
|
||||
response_dict["outputs"]["text"] = delta_text
|
||||
response_dict["outputs"]["reasoning_delta_message"] = reasoning_delta_message
|
||||
if self.reasoning_parser.is_reasoning_end(previous_token_ids + token_ids):
|
||||
self.reasoning_end_dict[req_id] = True
|
||||
if self.tool_parser_obj and req_id in self.reasoning_end_dict:
|
||||
if req_id not in self.tool_parser_dict:
|
||||
self.tool_parser_dict[req_id] = self.tool_parser_obj(self.tokenizer)
|
||||
tool_parser = self.tool_parser_dict[req_id]
|
||||
tool_call = tool_parser.extract_tool_calls_streaming(
|
||||
previous_texts,
|
||||
previous_texts + delta_text,
|
||||
delta_text,
|
||||
previous_token_ids,
|
||||
previous_token_ids + token_ids,
|
||||
token_ids,
|
||||
response_dict,
|
||||
)
|
||||
response_dict["outputs"]["tool_delta_message"] = tool_call
|
||||
response_dict["outputs"]["text"] = delta_text
|
||||
if is_end:
|
||||
data_processor_logger.info(f"req_id:{req_id}, decode_status: {self.decode_status[req_id]}")
|
||||
del self.decode_status[req_id]
|
||||
if req_id in self.tool_parser_dict:
|
||||
del self.tool_parser_dict[req_id]
|
||||
if req_id in self.reasoning_end_dict:
|
||||
del self.reasoning_end_dict[req_id]
|
||||
return response_dict
|
||||
|
||||
def messages2ids(self, request_or_messages):
|
||||
@@ -310,7 +351,7 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
split_special_tokens=False,
|
||||
add_special_tokens=False,
|
||||
)
|
||||
|
||||
request_or_messages["text_after_process"] = spliced_message
|
||||
req_id = None
|
||||
if isinstance(request_or_messages, dict):
|
||||
req_id = request_or_messages.get("request_id", None)
|
||||
|
@@ -14,8 +14,6 @@
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
# cipher_token=WjI1fQOvhN # do not edit this line
|
||||
|
||||
import os
|
||||
import re
|
||||
from shutil import copyfile
|
||||
|
@@ -34,6 +34,7 @@ class ErnieMoEVLProcessor(ErnieProcessor):
|
||||
limit_mm_per_prompt=None,
|
||||
mm_processor_kwargs=None,
|
||||
reasoning_parser_obj=None,
|
||||
tool_parser_obj=None,
|
||||
):
|
||||
self.use_hf_tokenizer = False
|
||||
|
||||
@@ -53,7 +54,9 @@ class ErnieMoEVLProcessor(ErnieProcessor):
|
||||
self.image_patch_id = self.ernie_processor.image_patch_id
|
||||
self.spatial_conv_size = self.ernie_processor.spatial_conv_size
|
||||
|
||||
self.tool_parser_dict = dict()
|
||||
self.decode_status = dict()
|
||||
self.reasoning_end_dict = dict()
|
||||
self._load_tokenizer()
|
||||
self.eos_token_ids = [self.tokenizer.eos_token_id]
|
||||
self.eos_token_id_len = len(self.eos_token_ids)
|
||||
@@ -62,6 +65,7 @@ class ErnieMoEVLProcessor(ErnieProcessor):
|
||||
self.reasoning_parser = None
|
||||
if reasoning_parser_obj:
|
||||
self.reasoning_parser = reasoning_parser_obj(self.tokenizer)
|
||||
self.tool_parser_obj = tool_parser_obj
|
||||
|
||||
# Generation config
|
||||
try:
|
||||
@@ -211,6 +215,7 @@ class ErnieMoEVLProcessor(ErnieProcessor):
|
||||
self._check_mm_limits(multimodal_data)
|
||||
images = multimodal_data.get("image", None)
|
||||
videos = multimodal_data.get("video", None)
|
||||
request["text_after_process"] = request.get("prompt")
|
||||
outputs = self.ernie_processor.text2ids(request["prompt"], images, videos)
|
||||
elif request.get("messages"):
|
||||
messages = request["messages"]
|
||||
|
@@ -494,16 +494,15 @@ class DataProcessor:
|
||||
"""
|
||||
if self.tokenizer.chat_template is None:
|
||||
raise ValueError("This model does not support chat_template.")
|
||||
|
||||
prompt_token_str = (
|
||||
self.tokenizer.apply_chat_template(
|
||||
request,
|
||||
tokenize=False,
|
||||
add_generation_prompt=request.get("add_generation_prompt", True),
|
||||
)
|
||||
.replace("<|image@placeholder|>", "")
|
||||
.replace("<|video@placeholder|>", "")
|
||||
prompt_token_template = self.tokenizer.apply_chat_template(
|
||||
request,
|
||||
tokenize=False,
|
||||
add_generation_prompt=request.get("add_generation_prompt", True),
|
||||
)
|
||||
prompt_token_str = prompt_token_template.replace("<|image@placeholder|>", "").replace(
|
||||
"<|video@placeholder|>", ""
|
||||
)
|
||||
request["text_after_process"] = prompt_token_template
|
||||
tokens = self.tokenizer.tokenize(prompt_token_str)
|
||||
token_ids = self.tokenizer.convert_tokens_to_ids(tokens)
|
||||
data_processor_logger.info(
|
||||
|
@@ -88,9 +88,6 @@ def process_image_data(image_data, mime_type, url):
|
||||
|
||||
def http_to_pil_image(url):
|
||||
"""http_to_pil_image"""
|
||||
if is_public_url(url) and int(os.getenv("DOWNLOAD_WITH_TP_SERVER", "0")):
|
||||
return http_to_pil_image_with_tp_server(url)
|
||||
|
||||
response = requests.get(url)
|
||||
if response.status_code != 200:
|
||||
raise Exception("Failed to download the image from URL.")
|
||||
@@ -106,60 +103,6 @@ def http_to_pil_image(url):
|
||||
return pil_image
|
||||
|
||||
|
||||
def http_to_pil_image_with_tp_server(url, retry_time=6):
|
||||
"""cnap平台没有外网访问权限,需要使用tp服务下载图片"""
|
||||
proxies = [
|
||||
{"http": "http://10.229.197.142:8807"},
|
||||
{"http": "http://10.229.197.161:8804"},
|
||||
{"http": "http://10.229.198.143:8804"},
|
||||
{"http": "http://10.122.108.164:8807"},
|
||||
{"http": "http://10.122.108.165:8807"},
|
||||
{"http": "http://10.122.108.166:8807"},
|
||||
{"http": "http://10.122.108.168:8801"},
|
||||
{"http": "http://10.122.150.146:8802"},
|
||||
{"http": "http://10.122.150.158:8802"},
|
||||
{"http": "http://10.122.150.164:8801"},
|
||||
{"http": "http://10.143.51.38:8813"},
|
||||
{"http": "http://10.143.103.42:8810"},
|
||||
{"http": "http://10.143.194.45:8804"},
|
||||
{"http": "http://10.143.226.25:8801"},
|
||||
{"http": "http://10.143.236.12:8807"},
|
||||
{"http": "http://10.143.238.36:8807"},
|
||||
{"http": "http://10.144.71.30:8807"},
|
||||
{"http": "http://10.144.73.16:8804"},
|
||||
{"http": "http://10.144.138.36:8801"},
|
||||
{"http": "http://10.144.152.40:8810"},
|
||||
{"http": "http://10.144.199.29:8810"},
|
||||
{"http": "http://10.144.251.29:8813"},
|
||||
]
|
||||
headers = {
|
||||
"X-Tp-Authorization": "Basic RVJOSUVMaXRlVjpFUk5JRUxpdGVWXzFxYXo0cmZ2M2VkYzV0Z2Iyd3N4LWJmZS10cA==",
|
||||
"scheme": "https",
|
||||
}
|
||||
|
||||
new_url = url.replace("https://", "http://") if url.startswith("https://") else url
|
||||
|
||||
# 代理可能不稳定,需要重试
|
||||
for idx in range(retry_time):
|
||||
try:
|
||||
response = requests.get(new_url, headers=headers, proxies=random.choice(proxies))
|
||||
if response.status_code == 200:
|
||||
image_data = io.BytesIO(response.content)
|
||||
|
||||
mime_type = response.headers.get("Content-Type")
|
||||
if mime_type is None:
|
||||
mime_type, _ = mimetypes.guess_type(url)
|
||||
|
||||
data_processor_logger.info(f"Detected MIME type: {mime_type}") # 调试信息
|
||||
pil_image = process_image_data(image_data, mime_type, url)
|
||||
|
||||
return pil_image
|
||||
except Exception as e:
|
||||
data_processor_logger.error(f"Failed to download the image, idx: {idx}, URL: {url}, error: {e}")
|
||||
|
||||
raise Exception(f"Failed to download the image from URL: {url}")
|
||||
|
||||
|
||||
def base64_to_pil_image(base64_string):
|
||||
"""base64_to_pil_image"""
|
||||
image_bytes = base64.b64decode(base64_string)
|
||||
|
@@ -18,6 +18,7 @@ from typing import Any, Dict, Optional
|
||||
|
||||
from fastdeploy.config import ErnieArchitectures
|
||||
from fastdeploy.engine.config import ModelConfig
|
||||
from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager
|
||||
from fastdeploy.reasoning import ReasoningParserManager
|
||||
|
||||
|
||||
@@ -48,6 +49,7 @@ class InputPreprocessor:
|
||||
limit_mm_per_prompt: Optional[Dict[str, Any]] = None,
|
||||
mm_processor_kwargs: Optional[Dict[str, Any]] = None,
|
||||
enable_mm: bool = False,
|
||||
tool_parser: str = None,
|
||||
) -> None:
|
||||
|
||||
self.model_name_or_path = model_name_or_path
|
||||
@@ -55,6 +57,7 @@ class InputPreprocessor:
|
||||
self.enable_mm = enable_mm
|
||||
self.limit_mm_per_prompt = limit_mm_per_prompt
|
||||
self.mm_processor_kwargs = mm_processor_kwargs
|
||||
self.tool_parser = tool_parser
|
||||
|
||||
def create_processor(self):
|
||||
"""
|
||||
@@ -68,8 +71,11 @@ class InputPreprocessor:
|
||||
DataProcessor or MultiModalRegistry.Processor (Union[DataProcessor, MultiModalRegistry.Processor]): 数据处理器。
|
||||
"""
|
||||
reasoning_parser_obj = None
|
||||
tool_parser_obj = None
|
||||
if self.reasoning_parser:
|
||||
reasoning_parser_obj = ReasoningParserManager.get_reasoning_parser(self.reasoning_parser)
|
||||
if self.tool_parser:
|
||||
tool_parser_obj = ToolParserManager.get_tool_parser(self.tool_parser)
|
||||
architectures = ModelConfig({"model": self.model_name_or_path}).architectures[0]
|
||||
if not self.enable_mm:
|
||||
if not ErnieArchitectures.contains_ernie_arch(architectures):
|
||||
@@ -78,6 +84,7 @@ class InputPreprocessor:
|
||||
self.processor = DataProcessor(
|
||||
model_name_or_path=self.model_name_or_path,
|
||||
reasoning_parser_obj=reasoning_parser_obj,
|
||||
tool_parser_obj=tool_parser_obj,
|
||||
)
|
||||
else:
|
||||
from fastdeploy.input.ernie_processor import ErnieProcessor
|
||||
@@ -85,6 +92,7 @@ class InputPreprocessor:
|
||||
self.processor = ErnieProcessor(
|
||||
model_name_or_path=self.model_name_or_path,
|
||||
reasoning_parser_obj=reasoning_parser_obj,
|
||||
tool_parser_obj=tool_parser_obj,
|
||||
)
|
||||
else:
|
||||
if not architectures.startswith("Ernie4_5_VLMoeForConditionalGeneration"):
|
||||
@@ -97,5 +105,6 @@ class InputPreprocessor:
|
||||
limit_mm_per_prompt=self.limit_mm_per_prompt,
|
||||
mm_processor_kwargs=self.mm_processor_kwargs,
|
||||
reasoning_parser_obj=reasoning_parser_obj,
|
||||
tool_parser_obj=tool_parser_obj,
|
||||
)
|
||||
return self.processor
|
||||
|
@@ -148,7 +148,7 @@ class BaseDataProcessor(ABC):
|
||||
|
||||
|
||||
class DataProcessor(BaseDataProcessor):
|
||||
def __init__(self, model_name_or_path, reasoning_parser_obj=None):
|
||||
def __init__(self, model_name_or_path, reasoning_parser_obj=None, tool_parser_obj=None):
|
||||
"""
|
||||
Initializes the DecodeStatus object.
|
||||
|
||||
@@ -168,6 +168,8 @@ class DataProcessor(BaseDataProcessor):
|
||||
self._init_config()
|
||||
|
||||
self.decode_status = dict()
|
||||
self.tool_parser_dict = dict()
|
||||
self.reasoning_end_dict = dict()
|
||||
self.tokenizer = self._load_tokenizer()
|
||||
data_processor_logger.info(
|
||||
f"tokenizer information: bos_token is {self.tokenizer.bos_token}, {self.tokenizer.bos_token_id}, \
|
||||
@@ -180,6 +182,7 @@ class DataProcessor(BaseDataProcessor):
|
||||
self.eos_token_id_len = len(self.eos_token_ids)
|
||||
self.pad_token_id = self.get_pad_id()
|
||||
self.reasoning_parser = None
|
||||
self.tool_parser_obj = tool_parser_obj
|
||||
if reasoning_parser_obj:
|
||||
self.reasoning_parser = reasoning_parser_obj(self.tokenizer)
|
||||
self.tokenizer.pad_token_id = self.pad_token_id
|
||||
@@ -281,6 +284,7 @@ class DataProcessor(BaseDataProcessor):
|
||||
# processing prompt_token_ids
|
||||
if not request.get("prompt_token_ids"):
|
||||
if "prompt" in request:
|
||||
request["text_after_process"] = request["prompt"]
|
||||
request["prompt_token_ids"] = self.text2ids(request["prompt"], max_model_len).tolist()
|
||||
elif "messages" in request:
|
||||
if self.tokenizer.chat_template is None:
|
||||
@@ -328,6 +332,12 @@ class DataProcessor(BaseDataProcessor):
|
||||
else:
|
||||
# 模型不支持思考,并且没单独设置enable_thinking为false
|
||||
response_dict.outputs.text = full_text
|
||||
if self.tool_parser_obj:
|
||||
tool_parser = self.tool_parser_obj(self.tokenizer)
|
||||
tool_call_info = tool_parser.extract_tool_calls(full_text, response_dict)
|
||||
if tool_call_info.tools_called:
|
||||
response_dict.outputs.tool_calls = tool_call_info.tool_calls
|
||||
response_dict.outputs.text = tool_call_info.content
|
||||
data_processor_logger.info(f"req_id:{req_id}, token)ids: {token_ids}")
|
||||
|
||||
return response_dict
|
||||
@@ -352,12 +362,19 @@ class DataProcessor(BaseDataProcessor):
|
||||
delta_text, _, previous_texts = self.ids2tokens(token_ids, req_id)
|
||||
if is_end:
|
||||
full_text = previous_texts + delta_text
|
||||
response_dict["outputs"]["raw_prediction"] = full_text
|
||||
if enable_thinking and self.reasoning_parser:
|
||||
reasoning_content, text = self.reasoning_parser.extract_reasoning_content(full_text, response_dict)
|
||||
response_dict["outputs"]["text"] = text
|
||||
response_dict["outputs"]["reasoning_content"] = reasoning_content
|
||||
else:
|
||||
response_dict["outputs"]["text"] = full_text
|
||||
if self.tool_parser_obj:
|
||||
tool_parser = self.tool_parser_obj(self.tokenizer)
|
||||
tool_call_info = tool_parser.extract_tool_calls(full_text, response_dict)
|
||||
if tool_call_info.tools_called:
|
||||
response_dict["outputs"]["tool_call"] = tool_call_info.tool_calls
|
||||
response_dict["outputs"]["text"] = tool_call_info.content
|
||||
data_processor_logger.info(f"req_id:{req_id}, decode_status: {self.decode_status[req_id]}")
|
||||
del self.decode_status[req_id]
|
||||
return response_dict
|
||||
@@ -381,9 +398,13 @@ class DataProcessor(BaseDataProcessor):
|
||||
if token_ids[-1] == self.tokenizer.eos_token_id:
|
||||
token_ids = token_ids[:-1]
|
||||
delta_text, previous_token_ids, previous_texts = self.ids2tokens(token_ids, req_id)
|
||||
|
||||
if enable_thinking and self.reasoning_parser:
|
||||
reasoning_content, text = self.reasoning_parser.extract_reasoning_content_streaming(
|
||||
response_dict["outputs"]["raw_prediction"] = delta_text
|
||||
if (
|
||||
self.reasoning_parser
|
||||
and req_id not in self.reasoning_end_dict
|
||||
and (enable_thinking or self.reasoning_parser.__class__.__name__ == "ErnieX1ReasoningParser")
|
||||
):
|
||||
reasoning_delta_message = self.reasoning_parser.extract_reasoning_content_streaming(
|
||||
previous_texts,
|
||||
previous_texts + delta_text,
|
||||
delta_text,
|
||||
@@ -391,13 +412,31 @@ class DataProcessor(BaseDataProcessor):
|
||||
previous_token_ids + token_ids,
|
||||
token_ids,
|
||||
)
|
||||
response_dict["outputs"]["text"] = text
|
||||
response_dict["outputs"]["reasoning_content"] = reasoning_content
|
||||
else:
|
||||
response_dict["outputs"]["text"] = delta_text
|
||||
response_dict["outputs"]["reasoning_delta_message"] = reasoning_delta_message
|
||||
if self.reasoning_parser.is_reasoning_end(previous_token_ids + token_ids):
|
||||
self.reasoning_end_dict[req_id] = True
|
||||
if self.tool_parser_obj and req_id in self.reasoning_end_dict:
|
||||
if req_id not in self.tool_parser_dict:
|
||||
self.tool_parser_dict[req_id] = self.tool_parser_obj(self.tokenizer)
|
||||
tool_parser = self.tool_parser_dict[req_id]
|
||||
tool_call = tool_parser.extract_tool_calls_streaming(
|
||||
previous_texts,
|
||||
previous_texts + delta_text,
|
||||
delta_text,
|
||||
previous_token_ids,
|
||||
previous_token_ids + token_ids,
|
||||
token_ids,
|
||||
response_dict,
|
||||
)
|
||||
response_dict["outputs"]["tool_delta_message"] = tool_call
|
||||
response_dict["outputs"]["text"] = delta_text
|
||||
if is_end:
|
||||
data_processor_logger.info(f"req_id:{req_id}, decode_status: {self.decode_status[req_id]}")
|
||||
del self.decode_status[req_id]
|
||||
if req_id in self.tool_parser_dict:
|
||||
del self.tool_parser_dict[req_id]
|
||||
if req_id in self.reasoning_end_dict:
|
||||
del self.reasoning_end_dict[req_id]
|
||||
return response_dict
|
||||
|
||||
def process_response_dict(self, response_dict, **kwargs):
|
||||
@@ -472,6 +511,7 @@ class DataProcessor(BaseDataProcessor):
|
||||
add_special_tokens=False,
|
||||
return_tensors="pd",
|
||||
)
|
||||
request["text_after_process"] = spliced_message
|
||||
req_id = None
|
||||
tokens = self.tokenizer.tokenize(spliced_message)
|
||||
if isinstance(request, dict):
|
||||
|
@@ -67,6 +67,7 @@ class ZmqClient:
|
||||
"""
|
||||
self.router = self.context.socket(zmq.ROUTER)
|
||||
self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
|
||||
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
|
||||
self.router.setsockopt(zmq.SNDTIMEO, -1)
|
||||
self.router.bind(f"ipc://{self.router_path}")
|
||||
|
||||
@@ -111,7 +112,6 @@ class ZmqClient:
|
||||
"""
|
||||
if self.router is None:
|
||||
raise RuntimeError("Router socket not created. Call create_router() first.")
|
||||
|
||||
while self.running:
|
||||
with self.mutex:
|
||||
if req_id not in self.req_dict:
|
||||
@@ -124,7 +124,11 @@ class ZmqClient:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
if self.req_dict[req_id] == -1:
|
||||
if data[-1].finished:
|
||||
with self.mutex:
|
||||
self.req_dict.pop(req_id, None)
|
||||
return
|
||||
try:
|
||||
start_send = time.time()
|
||||
if self.aggregate_send:
|
||||
@@ -133,7 +137,9 @@ class ZmqClient:
|
||||
result = msgpack.packb([response.to_dict() for response in data])
|
||||
self.router.send_multipart([self.req_dict[req_id], b"", result])
|
||||
llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")
|
||||
|
||||
except zmq.ZMQError as e:
|
||||
llm_logger.error(f"[{req_id}] zmq error: {e}")
|
||||
self.req_dict[req_id] = -1
|
||||
except Exception as e:
|
||||
llm_logger.error(f"Send result to zmq client failed: {e}")
|
||||
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
from fastapi import FastAPI
|
||||
from opentelemetry import trace
|
||||
@@ -176,7 +177,22 @@ def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
|
||||
return
|
||||
# extract Trace context from request.metadata.trace_carrier
|
||||
ctx = extract_from_metadata(request)
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind):
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
|
||||
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def fd_start_span(span_name, kind=trace.SpanKind.CLIENT):
|
||||
"""
|
||||
when fd start, start a new span show start success
|
||||
"""
|
||||
try:
|
||||
if not traces_enable:
|
||||
return
|
||||
with tracer.start_as_current_span(span_name, kind=kind) as span:
|
||||
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
@@ -191,7 +207,8 @@ def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
|
||||
return
|
||||
# extract Trace context from request.metadata.trace_carrier
|
||||
ctx = extract_from_request(request)
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind):
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
|
||||
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
@@ -14,7 +14,6 @@
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
# cipher_token=WjI1fQOvhN # do not edit this line
|
||||
from typing import Optional
|
||||
|
||||
import paddle
|
||||
|
@@ -208,7 +208,7 @@ class FlashAttentionBackend(AttentionBackend):
|
||||
) = pre_cache_len_concat(
|
||||
forward_meta.seq_lens_decoder,
|
||||
forward_meta.seq_lens_this_time,
|
||||
metadata.set_max_lengths[2],
|
||||
forward_meta.max_len_tensor_cpu[2],
|
||||
self.block_size,
|
||||
)
|
||||
|
||||
|
@@ -215,11 +215,13 @@ def load_pre_sharded_checkpoint(model_path: str, local_rank: int, use_fastsafete
|
||||
"""
|
||||
load_pre_sharded_checkpoint
|
||||
"""
|
||||
from fastdeploy.model_executor.layers.utils import get_tensor
|
||||
|
||||
state_dict = {}
|
||||
_, safetensor_files = get_all_safetensors(os.path.join(model_path, f"rank{local_rank}"))
|
||||
weights_iterator = safetensors_weights_iterator(safetensor_files)
|
||||
for name, weight in weights_iterator:
|
||||
state_dict[name] = weight
|
||||
state_dict[name] = get_tensor(weight)
|
||||
return state_dict
|
||||
|
||||
|
||||
|
@@ -315,7 +315,7 @@ class DeepseekV3MLAAttention(nn.Layer):
|
||||
dtype=layernorm_out.dtype,
|
||||
)
|
||||
|
||||
if forward_meta.max_enc_len_this_time:
|
||||
if forward_meta.max_len_tensor_cpu[1]: # max_enc_len_this_time
|
||||
query = self.q_a_proj(layernorm_out)
|
||||
query = self.q_a_layernorm(query)
|
||||
query = self.q_b_proj(query)
|
||||
@@ -362,7 +362,7 @@ class DeepseekV3MLAAttention(nn.Layer):
|
||||
fmha_out_prefill = fmha_out_prefill * mask_encoder_batch.cast(fmha_out_prefill.dtype)
|
||||
|
||||
fmha_out = fmha_out + fmha_out_prefill
|
||||
if forward_meta.max_dec_len_this_time:
|
||||
if forward_meta.max_len_tensor_cpu[2]: # max_dec_len_this_time
|
||||
query = self.q_a_proj(layernorm_out)
|
||||
query = self.q_a_layernorm(query)
|
||||
ln_out_or_q_c = query
|
||||
|
@@ -432,6 +432,11 @@ class TokenProcessor:
|
||||
tokens = tokens[2 : batch + 2]
|
||||
|
||||
batch_result = list()
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
need_to_be_reschedule_req_ids = list(self.resource_manager.to_be_rescheduled_request_id_set)
|
||||
for request_id in need_to_be_reschedule_req_ids:
|
||||
if self.resource_manager.requests[request_id].idx >= (batch - 1): # No more token generated for preempted request
|
||||
self.resource_manager.reschedule_preempt_task(request_id)
|
||||
for i in range(batch):
|
||||
if self.resource_manager.stop_flags[i]:
|
||||
continue
|
||||
@@ -458,6 +463,9 @@ class TokenProcessor:
|
||||
if recovery_stop:
|
||||
llm_logger.info(f"recovery stop signal found at task {task_id}")
|
||||
if not recovery_stop and token_id < 0:
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
if task_id in self.resource_manager.to_be_rescheduled_request_id_set:
|
||||
self.resource_manager.reschedule_preempt_task(task_id)
|
||||
continue
|
||||
|
||||
if task.get("prefill_chunk_info", None) is not None:
|
||||
|
@@ -16,6 +16,7 @@
|
||||
|
||||
from .abs_reasoning_parsers import ReasoningParser, ReasoningParserManager
|
||||
from .ernie_vl_reasoning_parsers import ErnieVLReasoningParser
|
||||
from .ernie_x1_reasoning_parsers import ErnieX1ReasoningParser
|
||||
from .qwen3_reasoning_parsers import Qwen3ReasoningParser
|
||||
|
||||
__all__ = [
|
||||
@@ -23,4 +24,5 @@ __all__ = [
|
||||
"ReasoningParserManager",
|
||||
"ErnieVLReasoningParser",
|
||||
"Qwen3ReasoningParser",
|
||||
"ErnieX1ReasoningParser",
|
||||
]
|
||||
|
@@ -65,18 +65,16 @@ class ErnieVLReasoningParser(ReasoningParser):
|
||||
"""
|
||||
# Skip single special tokens
|
||||
if len(delta_token_ids) == 1 and delta_token_ids[0] == self.think_end_token_id:
|
||||
return "", ""
|
||||
return None
|
||||
if self.think_end_token_id in delta_token_ids:
|
||||
end_index = delta_text.find(self.end_token)
|
||||
reasoning_content = delta_text[:end_index]
|
||||
content = delta_text[end_index + len(self.end_token) :]
|
||||
content = delta_text[end_index + len(self.end_token)]
|
||||
return DeltaMessage(reasoning_content=reasoning_content, content=content)
|
||||
elif self.think_end_token_id in previous_token_ids:
|
||||
reasoning_content = ""
|
||||
content = delta_text
|
||||
return DeltaMessage(content=delta_text)
|
||||
else:
|
||||
reasoning_content = delta_text
|
||||
content = ""
|
||||
return reasoning_content, content
|
||||
return DeltaMessage(reasoning_content=delta_text)
|
||||
|
||||
def extract_reasoning_content(
|
||||
self, model_output: str, request: ChatCompletionRequest
|
||||
|
162
fastdeploy/reasoning/ernie_x1_reasoning_parsers.py
Normal file
162
fastdeploy/reasoning/ernie_x1_reasoning_parsers.py
Normal file
@@ -0,0 +1,162 @@
|
||||
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
#
|
||||
from collections.abc import Sequence
|
||||
from typing import Tuple, Union
|
||||
|
||||
from fastdeploy.entrypoints.openai.protocol import ChatCompletionRequest, DeltaMessage
|
||||
from fastdeploy.reasoning import ReasoningParser, ReasoningParserManager
|
||||
|
||||
#
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
@ReasoningParserManager.register_module("ernie_x1")
|
||||
class ErnieX1ReasoningParser(ReasoningParser):
|
||||
"""
|
||||
Reasoning parser for ernie_x1 model with stricter boundary checking.
|
||||
|
||||
This implementation follows the user's proposed approach:
|
||||
1. For thinking content: waits for \n then checks for </think> tag
|
||||
2. For response content: checks for <response> tag first, then waits for \n
|
||||
3. Handles newlines in content more precisely
|
||||
"""
|
||||
|
||||
def __init__(self, tokenizer):
|
||||
super().__init__(tokenizer)
|
||||
self.think_end_token = "</think>"
|
||||
self.response_start_token = "<response>"
|
||||
self.response_end_token = "</response>"
|
||||
self.tool_call_start_token = "<tool_call>"
|
||||
self.tool_call_end_token = "</tool_call>"
|
||||
|
||||
if not self.model_tokenizer:
|
||||
raise ValueError("The model tokenizer must be passed to the ReasoningParser constructor.")
|
||||
|
||||
self.think_end_token_id = self.vocab.get("</think>")
|
||||
if self.think_end_token_id is None:
|
||||
raise RuntimeError("Could not find think end token id in tokenizer vocabulary")
|
||||
self.tool_call_start_token_id = self.vocab.get("<tool_call>")
|
||||
|
||||
def is_reasoning_end(self, input_ids: list[int]) -> bool:
|
||||
return self.tool_call_start_token_id in input_ids
|
||||
|
||||
def extract_reasoning_content_streaming(
|
||||
self,
|
||||
previous_text: str,
|
||||
current_text: str,
|
||||
delta_text: str,
|
||||
previous_token_ids: Sequence[int],
|
||||
current_token_ids: Sequence[int],
|
||||
delta_token_ids: Sequence[int],
|
||||
) -> Union[DeltaMessage, None]:
|
||||
"""
|
||||
根据用户需求实现的流式解析方法:
|
||||
1. 初始内容都视为思考内容,返回delta_text,""
|
||||
2. 当遇到\n时检查后续是否是</think>
|
||||
3. 如果直接遇到</think>也结束思考
|
||||
4. 思考结束后检查是<response>还是<tool_call>
|
||||
5. 对于<response>内容,处理各种边界条件
|
||||
"""
|
||||
if len(delta_token_ids) == 1 and delta_token_ids[0] == self.think_end_token_id:
|
||||
return None
|
||||
# 思考阶段处理
|
||||
if not previous_text.endswith(self.think_end_token) and self.think_end_token not in previous_text:
|
||||
# 如果遇到\n,暂时不返回,等待下一个delta_text
|
||||
if delta_text == "\n":
|
||||
return None
|
||||
# 如果前一个是\n且当前是</think>,结束思考
|
||||
elif previous_text.endswith("\n") and delta_text.startswith(self.think_end_token):
|
||||
return None
|
||||
# 如果直接遇到</think>也结束思考
|
||||
elif delta_text.startswith(self.think_end_token):
|
||||
return None
|
||||
# 否则继续返回思考内容
|
||||
return DeltaMessage(reasoning_content=delta_text)
|
||||
|
||||
# 思考结束后检查是tool_call还是response
|
||||
remaining_text = previous_text + delta_text
|
||||
after_think = remaining_text[remaining_text.find(self.think_end_token) + len(self.think_end_token) :]
|
||||
after_think = after_think.lstrip("\n") # 跳过think后的换行
|
||||
|
||||
# 处理tool_call情况
|
||||
if after_think.startswith(self.tool_call_start_token):
|
||||
return None
|
||||
|
||||
# 处理response情况
|
||||
if after_think.startswith(self.response_start_token):
|
||||
# 遇到<response>标签时不立即返回
|
||||
if delta_text == self.response_start_token:
|
||||
return None
|
||||
# 遇到<response>后的换行符也不立即返回
|
||||
elif delta_text == "\n" and previous_text.endswith(self.response_start_token):
|
||||
return None
|
||||
# 处理回复内容中的换行符
|
||||
if delta_text == "\n":
|
||||
return None
|
||||
# 如果前一个是\n且当前是</response>,结束回复
|
||||
elif previous_text.endswith("\n") and delta_text == self.response_end_token:
|
||||
return None
|
||||
# 如果直接遇到</response>也结束回复
|
||||
elif delta_text == self.response_end_token:
|
||||
return None
|
||||
# 其他情况返回实际内容
|
||||
else:
|
||||
return DeltaMessage(content=delta_text)
|
||||
|
||||
# 默认情况不返回内容
|
||||
return None
|
||||
|
||||
def extract_reasoning_content(self, model_output: str, request: ChatCompletionRequest) -> Tuple[str, str]:
|
||||
"""
|
||||
Batch version of the enhanced parser.
|
||||
Modified to preserve newlines in both reasoning and response content,
|
||||
only removing the single newline before closing tags.
|
||||
"""
|
||||
reasoning_content = ""
|
||||
response_content = ""
|
||||
|
||||
think_end_pos = model_output.find(self.think_end_token)
|
||||
if think_end_pos != -1:
|
||||
# Extract thinking content - only remove the last newline before </think>
|
||||
reasoning_content = model_output[:think_end_pos]
|
||||
if think_end_pos > 0 and reasoning_content[-1] == "\n":
|
||||
reasoning_content = reasoning_content[:-1]
|
||||
|
||||
remaining = model_output[think_end_pos + len(self.think_end_token) :]
|
||||
|
||||
# Skip newlines after </think>
|
||||
remaining = remaining.lstrip("\n")
|
||||
|
||||
# Check for response or tool_call
|
||||
if remaining.startswith(self.response_start_token):
|
||||
response_pos = len(self.response_start_token)
|
||||
remaining = remaining[response_pos:].lstrip("\n")
|
||||
response_end_pos = remaining.find(self.response_end_token)
|
||||
if response_end_pos != -1:
|
||||
# Only strip the last newline before </response>, not all
|
||||
if response_end_pos > 0 and remaining[response_end_pos - 1] == "\n":
|
||||
response_content = remaining[: response_end_pos - 1]
|
||||
else:
|
||||
response_content = remaining[:response_end_pos]
|
||||
else:
|
||||
# If no </response> found, return the rest as response content
|
||||
response_content = remaining
|
||||
elif remaining.startswith(self.tool_call_start_token):
|
||||
pass # No response content
|
||||
else:
|
||||
# No thinking content found, return the whole input as reasoning
|
||||
reasoning_content = model_output
|
||||
response_content = ""
|
||||
return reasoning_content, response_content
|
@@ -86,6 +86,7 @@ class BaseRLModel(nn.Layer):
|
||||
super(BaseRLModel, self).__init__()
|
||||
self.infer_to_train_mapping = {}
|
||||
self.fd_config = None
|
||||
self._mappings_built = False
|
||||
|
||||
@classmethod
|
||||
def name(cls) -> str:
|
||||
@@ -142,6 +143,12 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM, BaseRLModel):
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -215,6 +222,11 @@ class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGener
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -316,6 +328,11 @@ class Qwen2ForCausalLMRL(Qwen2ForCausalLM, BaseRLModel):
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -360,6 +377,11 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM, BaseRLModel):
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -429,4 +451,30 @@ class Qwen3ForCausalLMRL(Qwen3ForCausalLM, BaseRLModel):
|
||||
return "Qwen3ForCausalLMRL"
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
pass
|
||||
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
# Initialize mapping dictionary
|
||||
self._update_base_mappings("model")
|
||||
base_name = "model.layers"
|
||||
|
||||
# Helper function to add layer mappings
|
||||
def _add_layer_mappings(layer_idx):
|
||||
# FFN mappings
|
||||
for ph in place_holders:
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = (
|
||||
f"{base_name}.{layer_idx}.mlp.gate_up_fused_proj.{ph}"
|
||||
)
|
||||
|
||||
for layer_idx in range(self.fd_config.model_config.num_hidden_layers):
|
||||
_add_layer_mappings(layer_idx)
|
||||
|
||||
self._complete_missing_mappings()
|
||||
|
||||
return self.infer_to_train_mapping
|
||||
|
@@ -107,7 +107,7 @@ class MTPProposer(Proposer):
|
||||
idx = i
|
||||
self.model_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.model_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.model_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -118,6 +118,7 @@ class MTPProposer(Proposer):
|
||||
self.model_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def initialize_kv_cache(self):
|
||||
"""
|
||||
@@ -263,7 +264,8 @@ class MTPProposer(Proposer):
|
||||
# Same shape/dytpe with base model
|
||||
self.model_inputs["block_tables"] = paddle.clone(self.main_model_inputs["block_tables"])
|
||||
self.model_inputs["input_ids"] = paddle.clone(self.main_model_inputs["input_ids"])
|
||||
self.model_inputs["seq_lens_this_time"] = paddle.clone(self.main_model_inputs["seq_lens_this_time"])
|
||||
self.seq_lens_this_time_buffer = paddle.clone(self.main_model_inputs["seq_lens_this_time"])
|
||||
|
||||
self.model_inputs["seq_lens_encoder"] = paddle.clone(self.main_model_inputs["seq_lens_encoder"])
|
||||
self.model_inputs["seq_lens_decoder"] = paddle.clone(self.main_model_inputs["seq_lens_decoder"])
|
||||
self.model_inputs["step_idx"] = paddle.clone(self.main_model_inputs["step_idx"])
|
||||
@@ -338,7 +340,7 @@ class MTPProposer(Proposer):
|
||||
self.main_model_inputs["seq_lens_this_time"], fill_value=-1, dtype="int32"
|
||||
)
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to model_inputs buffer
|
||||
"""
|
||||
@@ -372,7 +374,7 @@ class MTPProposer(Proposer):
|
||||
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = prefill_token_num
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = prefill_token_num
|
||||
|
||||
self.model_inputs["stop_flags"][idx : idx + 1] = False
|
||||
self.model_inputs["batch_drop"][idx : idx + 1] = False
|
||||
@@ -397,10 +399,10 @@ class MTPProposer(Proposer):
|
||||
if self.cache_config.enable_chunked_prefill:
|
||||
token_chunk_size = request.prefill_chunk_info[0]
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
else:
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
|
||||
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.model_inputs["stop_flags"][idx : idx + 1] = False
|
||||
@@ -413,6 +415,7 @@ class MTPProposer(Proposer):
|
||||
request.get("block_tables"), dtype="int32"
|
||||
)
|
||||
self.model_inputs["not_need_stop"][0] = True
|
||||
self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def _initialize_forward_meta(self):
|
||||
"""
|
||||
|
@@ -15,6 +15,7 @@
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import codecs
|
||||
import importlib
|
||||
import logging
|
||||
@@ -22,6 +23,7 @@ import os
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import tarfile
|
||||
import time
|
||||
from datetime import datetime
|
||||
@@ -291,6 +293,16 @@ def extract_tar(tar_path, output_dir):
|
||||
raise RuntimeError(f"Extraction failed: {e!s}")
|
||||
|
||||
|
||||
def get_limited_max_value(max_value):
|
||||
def validator(value):
|
||||
value = float(value)
|
||||
if value > max_value:
|
||||
raise argparse.ArgumentTypeError(f"The value cannot exceed {max_value}")
|
||||
return value
|
||||
|
||||
return validator
|
||||
|
||||
|
||||
def download_model(url, output_dir, temp_tar):
|
||||
"""
|
||||
下载模型,并将其解压到指定目录。
|
||||
@@ -580,6 +592,22 @@ def is_list_of(
|
||||
assert_never(check)
|
||||
|
||||
|
||||
def import_from_path(module_name: str, file_path: Union[str, os.PathLike]):
|
||||
"""
|
||||
Import a Python file according to its file path.
|
||||
"""
|
||||
spec = importlib.util.spec_from_file_location(module_name, file_path)
|
||||
if spec is None:
|
||||
raise ModuleNotFoundError(f"No module named '{module_name}'")
|
||||
|
||||
assert spec.loader is not None
|
||||
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def version():
|
||||
"""
|
||||
Prints the contents of the version.txt file located in the parent directory of this script.
|
||||
@@ -596,6 +624,61 @@ def version():
|
||||
return content
|
||||
|
||||
|
||||
class StatefulSemaphore:
|
||||
__slots__ = ("_semaphore", "_max_value", "_acquired_count", "_last_reset")
|
||||
|
||||
"""
|
||||
StatefulSemaphore is a class that wraps an asyncio.Semaphore and provides additional stateful information.
|
||||
"""
|
||||
|
||||
def __init__(self, value: int):
|
||||
"""
|
||||
StatefulSemaphore constructor
|
||||
"""
|
||||
if value < 0:
|
||||
raise ValueError("Value must be non-negative.")
|
||||
self._semaphore = asyncio.Semaphore(value)
|
||||
self._max_value = value
|
||||
self._acquired_count = 0
|
||||
self._last_reset = time.monotonic()
|
||||
|
||||
async def acquire(self):
|
||||
await self._semaphore.acquire()
|
||||
self._acquired_count += 1
|
||||
|
||||
def release(self):
|
||||
self._semaphore.release()
|
||||
|
||||
self._acquired_count = max(0, self._acquired_count - 1)
|
||||
|
||||
def locked(self) -> bool:
|
||||
return self._semaphore.locked()
|
||||
|
||||
@property
|
||||
def available(self) -> int:
|
||||
return self._max_value - self._acquired_count
|
||||
|
||||
@property
|
||||
def acquired(self) -> int:
|
||||
return self._acquired_count
|
||||
|
||||
@property
|
||||
def max_value(self) -> int:
|
||||
return self._max_value
|
||||
|
||||
@property
|
||||
def uptime(self) -> float:
|
||||
return time.monotonic() - self._last_reset
|
||||
|
||||
def status(self) -> dict:
|
||||
return {
|
||||
"available": self.available,
|
||||
"acquired": self.acquired,
|
||||
"max_value": self.max_value,
|
||||
"uptime": round(self.uptime, 2),
|
||||
}
|
||||
|
||||
|
||||
llm_logger = get_logger("fastdeploy", "fastdeploy.log")
|
||||
data_processor_logger = get_logger("data_processor", "data_processor.log")
|
||||
scheduler_logger = get_logger("scheduler", "scheduler.log")
|
||||
|
@@ -152,9 +152,11 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
schemata_key,
|
||||
)
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to share_inputs buffer
|
||||
req_dict: A list of Request dict
|
||||
num_running_requests: batch_size
|
||||
"""
|
||||
|
||||
if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill":
|
||||
@@ -193,7 +195,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids)
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 1
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -205,7 +207,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
request.draft_token_ids[0:num_prefill_send_token],
|
||||
dtype="int64",
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
|
||||
else:
|
||||
self.share_inputs["pre_ids"][idx : idx + 1] = -1
|
||||
self.share_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -222,14 +224,14 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size
|
||||
else:
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -270,13 +272,15 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
request.block_tables, dtype="int32"
|
||||
)
|
||||
|
||||
if request.get("bad_words_token_ids") is not None:
|
||||
if request.get("bad_words_token_ids") is not None and len(request.get("bad_words_token_ids")) > 0:
|
||||
bad_words_len = len(request.get("bad_words_token_ids"))
|
||||
if bad_words_len > 0:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
else:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = 1
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :] = np.array([-1], dtype="int64")
|
||||
|
||||
if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None:
|
||||
stop_seqs_num = len(request.get("stop_seqs_len"))
|
||||
@@ -293,6 +297,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
|
||||
if self.speculative_method in ["mtp"]:
|
||||
self.proposer.insert_prefill_inputs(req_dicts)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
|
||||
"""Set dummy prefill inputs to share_inputs"""
|
||||
@@ -311,7 +316,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -329,6 +334,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def _init_share_inputs(self, max_num_seqs: int):
|
||||
"""
|
||||
@@ -379,7 +385,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["max_length"] = paddle.full(
|
||||
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
@@ -921,6 +927,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -928,6 +935,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
model_forward_batch: 'Request' contains information related to prompt and is an abstract
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
num_running_requests: batch_size
|
||||
intermediate_tensors:
|
||||
"""
|
||||
# If `not_need_stop`` is False, it means the current worker is in an idle state.
|
||||
@@ -1053,6 +1061,9 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
|
||||
self._update_chunked_prefill(model_forward_batch)
|
||||
self._add_cache(model_forward_batch)
|
||||
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
|
||||
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
|
||||
)
|
||||
return None
|
||||
|
||||
def _add_cache(self, model_forward_batch) -> None:
|
||||
|
@@ -105,17 +105,18 @@ class GcuWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
|
||||
return output
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
"""
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts)
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
|
||||
def graph_optimize_and_warm_up_model(self) -> None:
|
||||
"""
|
||||
|
@@ -164,6 +164,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
if self.speculative_method == "ngram":
|
||||
self.proposer = NgramProposer(self.fd_config)
|
||||
elif self.speculative_method == "mtp":
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
self.proposer = MTPProposer(
|
||||
self.fd_config,
|
||||
self.get_model(),
|
||||
@@ -193,9 +194,11 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
return self.guided_backend.get_logits_processor(schemata_key=schemata_key), schemata_key
|
||||
|
||||
def insert_tasks_v1(self, req_dicts: List[Request]):
|
||||
def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1
|
||||
req_dict: A list of Request dict
|
||||
num_running_requests: batch_size
|
||||
"""
|
||||
# NOTE(luotingdan): Lazy initialize kv cache
|
||||
if "caches" not in self.share_inputs:
|
||||
@@ -203,11 +206,11 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
req_len = len(req_dicts)
|
||||
has_prefill_task = False
|
||||
has_decode_task = False
|
||||
for i in range(req_len):
|
||||
request = req_dicts[i]
|
||||
idx = request.idx
|
||||
if request.task_type.value == RequestType.PREFILL.value: # prefill task
|
||||
logger.debug(f"Handle prefill request {request} at idx {idx}")
|
||||
prefill_start_index = request.prefill_start_index
|
||||
prefill_end_index = request.prefill_end_index
|
||||
length = prefill_end_index - prefill_start_index
|
||||
@@ -253,6 +256,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
|
||||
input_ids = request.prompt_token_ids + request.output_token_ids
|
||||
logger.debug(f"Handle prefill request {request} at idx {idx} prefill_start_index {prefill_start_index} prefill_end_index {prefill_end_index} need_prefilled_token_num {len(input_ids)}")
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array(
|
||||
input_ids[prefill_start_index:prefill_end_index]
|
||||
)
|
||||
@@ -264,7 +268,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
self.share_inputs["stop_flags"][idx : idx + 1] = False
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids)
|
||||
@@ -281,12 +285,14 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array(
|
||||
request.block_tables, dtype="int32"
|
||||
)
|
||||
if self.share_inputs["is_block_step"][idx]: # has tasks to continue to decode
|
||||
has_decode_task = True
|
||||
continue
|
||||
else: # preempted task
|
||||
logger.debug(f"Handle preempted request {request} at idx {idx}")
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :] = -1
|
||||
self.share_inputs["stop_flags"][idx : idx + 1] = True
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["is_block_step"][idx : idx + 1] = False
|
||||
@@ -326,12 +332,15 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
else:
|
||||
self.share_inputs["stop_seqs_len"][idx : idx + 1, :] = 0
|
||||
|
||||
if has_prefill_task:
|
||||
if has_prefill_task or has_decode_task:
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to share_inputs buffer
|
||||
req_dict: A list of Request dict
|
||||
num_running_requests: batch_size
|
||||
TODO(gongshaotian): Refactor this func
|
||||
"""
|
||||
|
||||
@@ -365,7 +374,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids)
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 1
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -377,7 +386,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
request.draft_token_ids[0:num_prefill_send_token],
|
||||
dtype="int64",
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
|
||||
else:
|
||||
self.share_inputs["pre_ids"][idx : idx + 1] = -1
|
||||
self.share_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -412,7 +421,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size
|
||||
@@ -430,7 +439,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
else:
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -489,13 +498,15 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
request.block_tables, dtype="int32"
|
||||
)
|
||||
|
||||
if request.get("bad_words_token_ids") is not None:
|
||||
if request.get("bad_words_token_ids") is not None and len(request.get("bad_words_token_ids")) > 0:
|
||||
bad_words_len = len(request.get("bad_words_token_ids"))
|
||||
if bad_words_len > 0:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
else:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = 1
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :] = np.array([-1], dtype="int64")
|
||||
|
||||
if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None:
|
||||
stop_seqs_num = len(request.get("stop_seqs_len"))
|
||||
@@ -514,8 +525,10 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
if self.speculative_method in ["mtp"]:
|
||||
self.proposer.insert_prefill_inputs(req_dicts)
|
||||
self.proposer.insert_prefill_inputs(req_dicts, num_running_requests)
|
||||
|
||||
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
|
||||
"""Set dummy prefill inputs to share_inputs"""
|
||||
@@ -535,7 +548,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -553,6 +566,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def _init_share_inputs(self, max_num_seqs: int):
|
||||
"""
|
||||
@@ -603,7 +617,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["max_length"] = paddle.full(
|
||||
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
@@ -1247,6 +1261,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -1255,6 +1270,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
intermediate_tensors:
|
||||
num_running_requests: batch_size
|
||||
"""
|
||||
# 1. Prepare inputs of model and sampler.
|
||||
skip_idx_list = self._get_skip_idx(model_forward_batch)
|
||||
@@ -1356,8 +1372,8 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None),
|
||||
enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None),
|
||||
think_end_id=(self.model_config.think_end_id if self.enable_mm else -1),
|
||||
need_think_end=(self.share_inputs["need_think_end"] if self.enable_mm else None),
|
||||
reasoning_index=(self.share_inputs["reasoning_index"] if self.enable_mm else None),
|
||||
need_think_end=(self.share_inputs["need_think_end"][:num_running_requests] if self.enable_mm else None),
|
||||
reasoning_index=(self.share_inputs["reasoning_index"][:num_running_requests] if self.enable_mm else None),
|
||||
stop_token_ids=self.share_inputs["stop_seqs"],
|
||||
stop_seqs_len=self.share_inputs["stop_seqs_len"],
|
||||
)
|
||||
@@ -1397,6 +1413,10 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
self._update_chunked_prefill(model_forward_batch)
|
||||
self._add_cache(model_forward_batch)
|
||||
|
||||
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
|
||||
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
|
||||
)
|
||||
return None
|
||||
|
||||
def _add_cache(self, model_forward_batch) -> None:
|
||||
|
@@ -175,20 +175,21 @@ class GpuWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_request: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
output = self.model_runner.execute_model(model_forward_batch, num_running_request)
|
||||
return output
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
"""
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.model_runner.insert_tasks_v1(req_dicts=req_dicts)
|
||||
self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
else:
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts)
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
|
||||
def graph_optimize_and_warm_up_model(self) -> None:
|
||||
"""
|
||||
|
@@ -142,9 +142,10 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
schemata_key,
|
||||
)
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to share_inputs buffer
|
||||
num_running_requests: batch_size
|
||||
TODO(gongshaotian): Refactor this func
|
||||
"""
|
||||
|
||||
@@ -176,7 +177,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, 0] = request.prompt_token_ids[0]
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 1
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -188,7 +189,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
request.draft_token_ids[0:num_prefill_send_token],
|
||||
dtype="int64",
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
|
||||
else:
|
||||
self.share_inputs["pre_ids"][idx : idx + 1] = -1
|
||||
self.share_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -199,7 +200,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
request.set("chunk_idx", 1)
|
||||
logger.info(f"prefill_chunk_info: {request.prefill_chunk_info}")
|
||||
token_chunk_size = request.prefill_chunk_info[0]
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["input_ids"][idx, :token_chunk_size] = np.array(
|
||||
request.prompt_token_ids[:token_chunk_size]
|
||||
)
|
||||
@@ -211,7 +212,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
else:
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -242,13 +243,15 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
request.block_tables, dtype="int32"
|
||||
)
|
||||
|
||||
if request.get("bad_words_token_ids") is not None:
|
||||
if request.get("bad_words_token_ids") is not None and len(request.get("bad_words_token_ids")) > 0:
|
||||
bad_words_len = len(request.get("bad_words_token_ids"))
|
||||
if bad_words_len > 0:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
else:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = 1
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :] = np.array([-1], dtype="int64")
|
||||
|
||||
if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None:
|
||||
stop_seqs_num = len(request.get("stop_seqs_len"))
|
||||
@@ -262,6 +265,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.sampler.apply_logits_processor(idx, request.get("logits_processor"), prefill_tokens)
|
||||
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
|
||||
"""Set dummy prefill inputs to share_inputs"""
|
||||
@@ -281,7 +285,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -297,6 +301,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def _init_share_inputs(self, max_num_seqs: int):
|
||||
"""Initialize all share buffers for model inputs.
|
||||
@@ -342,7 +347,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["max_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64")
|
||||
self.share_inputs["min_length"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64")
|
||||
self.share_inputs["max_length"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64")
|
||||
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
@@ -859,6 +864,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -866,6 +872,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
model_forward_batch: 'Request' contains information related to prompt and is an abstract
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
num_running_requests: batch_size
|
||||
intermediate_tensors:
|
||||
"""
|
||||
# Note(@wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state.
|
||||
@@ -986,6 +993,9 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
|
||||
self._update_chunked_prefill(model_forward_batch)
|
||||
self._add_cache(model_forward_batch)
|
||||
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
|
||||
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
|
||||
)
|
||||
return None
|
||||
|
||||
def _add_cache(self, model_forward_batch) -> None:
|
||||
|
@@ -106,17 +106,18 @@ class IluvatarWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
|
||||
return output
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
"""
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts)
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
|
||||
def graph_optimize_and_warm_up_model(self) -> None:
|
||||
"""
|
||||
|
@@ -24,6 +24,7 @@ import paddle
|
||||
import paddle.distributed as dist
|
||||
from paddle.distributed import fleet
|
||||
|
||||
from fastdeploy import envs
|
||||
from fastdeploy.config import (
|
||||
CacheConfig,
|
||||
DecodingConfig,
|
||||
@@ -257,11 +258,11 @@ class PaddleDisWorkerProc:
|
||||
f"num_insert_requests: {len(req_dicts)}"
|
||||
)
|
||||
# Process prefill inputs
|
||||
self.worker.preprocess_new_task(req_dicts)
|
||||
self.worker.preprocess_new_task(req_dicts, num_running_requests)
|
||||
|
||||
# Execute model to generate token. The generated token will be written to the buffer.
|
||||
# These generated tokens can be obtained through get_output op.
|
||||
self.worker.execute_model()
|
||||
self.worker.execute_model(num_running_requests)
|
||||
|
||||
def event_loop_normal(self) -> None:
|
||||
"""Main event loop for Paddle Distrubuted Workers.
|
||||
@@ -289,8 +290,9 @@ class PaddleDisWorkerProc:
|
||||
if self.local_rank % mp_num_per_node == 0:
|
||||
if self.task_queue.num_tasks() > 0:
|
||||
# VL only support 1 batch to prefill
|
||||
|
||||
if not self.fd_config.model_config.enable_mm or not self.worker.exist_prefill():
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
|
||||
self.fd_config.model_config.enable_mm and self.worker.exist_prefill()
|
||||
):
|
||||
if self.nnode > 1 and self.parallel_config.tensor_parallel_size > self.max_chips_per_node:
|
||||
self.task_queue.read_finish_flag.set(1)
|
||||
else:
|
||||
@@ -338,7 +340,7 @@ class PaddleDisWorkerProc:
|
||||
)
|
||||
|
||||
# Process prefill inputs
|
||||
self.worker.preprocess_new_task(req_dicts)
|
||||
self.worker.preprocess_new_task(req_dicts, num_running_requests)
|
||||
|
||||
if not self.worker.model_runner.not_need_stop():
|
||||
if self.ranks > 1:
|
||||
@@ -349,7 +351,7 @@ class PaddleDisWorkerProc:
|
||||
|
||||
# Execute model to generate token. The generated token will be written to the buffer.
|
||||
# These generated tokens can be obtained through get_output op.
|
||||
self.worker.execute_model(req_dicts)
|
||||
self.worker.execute_model(req_dicts, num_running_requests)
|
||||
self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill()
|
||||
|
||||
def initialize_kv_cache(self) -> None:
|
||||
|
@@ -383,15 +383,18 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
|
||||
req_len = len(req_dicts)
|
||||
has_prefill_task = False
|
||||
has_decode_task = False
|
||||
for i in range(req_len):
|
||||
request = req_dicts[i]
|
||||
idx = request.idx
|
||||
if request.task_type.value == RequestType.PREFILL.value: # prefill task
|
||||
logger.debug(f"Handle prefill request {request} at idx {idx}")
|
||||
prefill_start_index = request.prefill_start_index
|
||||
prefill_end_index = request.prefill_end_index
|
||||
length = prefill_end_index - prefill_start_index
|
||||
input_ids = request.prompt_token_ids + request.output_token_ids
|
||||
logger.debug(
|
||||
f"Handle prefill request {request} at idx {idx} prefill_start_index {prefill_start_index} prefill_end_index {prefill_end_index} need_prefilled_token_num {len(input_ids)}"
|
||||
)
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array(
|
||||
input_ids[prefill_start_index:prefill_end_index]
|
||||
)
|
||||
@@ -420,6 +423,8 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array(
|
||||
request.block_tables, dtype="int32"
|
||||
)
|
||||
if self.share_inputs["is_block_step"][idx]: # has tasks to continue to decode
|
||||
has_decode_task = True
|
||||
continue
|
||||
else: # preempted task
|
||||
logger.debug(f"Handle preempted request {request} at idx {idx}")
|
||||
@@ -460,7 +465,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["stop_seqs"][:stop_seqs_num, : len(request.get("stop_token_ids")[0])] = np.array(
|
||||
request.get("stop_token_ids"), dtype="int64"
|
||||
)
|
||||
if has_prefill_task:
|
||||
if has_prefill_task or has_decode_task:
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
|
||||
def process_prefill_inputs(self, req_dicts: List[Request]):
|
||||
@@ -506,13 +511,15 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
request.block_tables, dtype="int32"
|
||||
)
|
||||
|
||||
if request.get("bad_words_token_ids") is not None:
|
||||
if request.get("bad_words_token_ids") is not None and len(request.get("bad_words_token_ids")) > 0:
|
||||
bad_words_len = len(request.get("bad_words_token_ids"))
|
||||
if bad_words_len > 0:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array(
|
||||
request.get("bad_words_token_ids"), dtype="int64"
|
||||
)
|
||||
else:
|
||||
self.share_inputs["bad_tokens_len"][idx : idx + 1] = 1
|
||||
self.share_inputs["bad_tokens"][idx : idx + 1, :] = np.array([-1], dtype="int64")
|
||||
|
||||
if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None:
|
||||
stop_seqs_num = len(request.get("stop_seqs_len"))
|
||||
@@ -851,6 +858,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
is_dummy_run: bool = False,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -858,6 +866,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
model_forward_batch: 'Request' contains information related to prompt and is an abstract
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
num_running_requests: batch_size
|
||||
intermediate_tensors:
|
||||
"""
|
||||
# 1. Prepare inputs of model and decoder.
|
||||
|
@@ -94,9 +94,14 @@ class XpuWorker(WorkerBase):
|
||||
xpu_get_used_global_memory,
|
||||
)
|
||||
|
||||
total_memory = xpu_get_total_global_memory(self.local_rank)
|
||||
used_memory = xpu_get_used_global_memory(self.local_rank)
|
||||
free_memory = xpu_get_free_global_memory(self.local_rank)
|
||||
assert self.device_ids[self.local_rank] is not None, f"device_id is none for rank {self.local_rank}"
|
||||
assert (
|
||||
len(self.device_ids) > self.local_rank
|
||||
), f"device number must be greater than local rank, but get device number is {len(self.device_ids)}, rank is {self.local_rank}"
|
||||
|
||||
total_memory = xpu_get_total_global_memory(int(self.device_ids[self.local_rank]))
|
||||
used_memory = xpu_get_used_global_memory(int(self.device_ids[self.local_rank]))
|
||||
free_memory = xpu_get_free_global_memory(int(self.device_ids[self.local_rank]))
|
||||
|
||||
logger.info(
|
||||
f"Before warm up, total_memory: {total_memory}, \
|
||||
@@ -107,7 +112,7 @@ class XpuWorker(WorkerBase):
|
||||
self.model_runner.profile_run()
|
||||
|
||||
total_available_memory = int(total_memory * self.cache_config.gpu_memory_utilization)
|
||||
used_memory = xpu_get_used_global_memory(self.local_rank)
|
||||
used_memory = xpu_get_used_global_memory(int(self.device_ids[self.local_rank]))
|
||||
available_kv_cache_memory = total_available_memory - used_memory
|
||||
model_block_memory_used = self.cal_theortical_kvcache()
|
||||
available_kv_cache_memory += model_block_memory_used * self.parallel_config.total_block_num
|
||||
@@ -140,9 +145,13 @@ class XpuWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
is_dummy_run: bool = False,
|
||||
num_running_requests: Optional[int] = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
|
||||
return output
|
||||
|
||||
def exist_prefill(self):
|
||||
@@ -151,7 +160,7 @@ class XpuWorker(WorkerBase):
|
||||
"""
|
||||
return self.model_runner.exist_prefill()
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int = -1) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
|
@@ -8,7 +8,7 @@ aiozmq
|
||||
openai>=1.93.0
|
||||
tqdm
|
||||
pynvml
|
||||
uvicorn
|
||||
uvicorn==0.29.0
|
||||
fastapi
|
||||
paddleformers
|
||||
redis
|
||||
@@ -37,3 +37,4 @@ opentelemetry-instrumentation-mysql
|
||||
opentelemetry-distro
|
||||
opentelemetry-exporter-otlp
|
||||
opentelemetry-instrumentation-fastapi
|
||||
partial_json_parser
|
||||
|
@@ -8,7 +8,7 @@ aiozmq
|
||||
openai
|
||||
tqdm
|
||||
pynvml
|
||||
uvicorn
|
||||
uvicorn==0.29.0
|
||||
fastapi
|
||||
paddleformers
|
||||
redis
|
||||
|
@@ -8,7 +8,7 @@ aiozmq
|
||||
openai
|
||||
tqdm
|
||||
pynvml
|
||||
uvicorn
|
||||
uvicorn==0.29.0
|
||||
fastapi
|
||||
paddleformers
|
||||
redis
|
||||
|
@@ -6,7 +6,23 @@ run_path="$DIR/../test/"
|
||||
cd ${run_path}
|
||||
ls
|
||||
|
||||
dirs=("layers" "operators" "worker" "utils")
|
||||
exclude=("ci_use" "ce")
|
||||
for d in */ ; do
|
||||
dir_name="${d%/}"
|
||||
if [[ -d "$dir_name" ]]; then
|
||||
skip=false
|
||||
for ex in "${exclude[@]}"; do
|
||||
if [[ "$dir_name" == "$ex" ]]; then
|
||||
skip=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
if ! $skip; then
|
||||
dirs+=("$dir_name")
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
failed_tests_file="failed_tests.log"
|
||||
> "$failed_tests_file"
|
||||
disabled_tests=(
|
||||
@@ -20,6 +36,10 @@ disabled_tests=(
|
||||
operators/test_stop_generation.py
|
||||
operators/test_air_topp_sampling.py
|
||||
operators/test_fused_moe.py
|
||||
layers/test_repetition_early_stopper.py
|
||||
operators/test_stop_generation_multi_ends.py
|
||||
utils/test_download.py
|
||||
graph_optimization/test_cuda_graph.py
|
||||
)
|
||||
is_disabled() {
|
||||
local test_file_rel="$1"
|
||||
@@ -46,11 +66,25 @@ for dir in "${dirs[@]}"; do
|
||||
echo "Skipping disabled test: $test_file"
|
||||
continue
|
||||
fi
|
||||
|
||||
python -m coverage run "$test_file"
|
||||
# TODO: Add a framework to manage unit test execution time
|
||||
timeout 600 python -m coverage run "$test_file"
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "$test_file" >> "$failed_tests_file"
|
||||
fail=$((fail + 1))
|
||||
|
||||
PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT)
|
||||
echo "==== PORT CLEAN AFTER UT FAILED ===="
|
||||
|
||||
for port in "${PORTS[@]}"; do
|
||||
PIDS=$(lsof -t -i :$port)
|
||||
if [ -n "$PIDS" ]; then
|
||||
echo "Port $port is occupied by PID(s): $PIDS"
|
||||
echo "$PIDS" | xargs -r kill -9
|
||||
echo "Port $port cleared"
|
||||
else
|
||||
echo "Port $port is free"
|
||||
fi
|
||||
done
|
||||
else
|
||||
success=$((success + 1))
|
||||
fi
|
||||
|
@@ -3,13 +3,10 @@ DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
echo "$DIR"
|
||||
|
||||
# python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
python -m pip install paddlepaddle-gpu==3.0.0.dev20250729 -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
python -m pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
|
||||
|
||||
python -m pip install -r requirements.txt
|
||||
python -m pip install jsonschema aistudio_sdk==0.3.5
|
||||
bash build.sh || exit 1
|
||||
|
||||
failed_files=()
|
||||
run_path="$DIR/../test/ci_use/"
|
2
setup.py
2
setup.py
@@ -181,7 +181,7 @@ def get_name():
|
||||
|
||||
cmdclass_dict = {"bdist_wheel": CustomBdistWheel}
|
||||
cmdclass_dict["build_ext"] = CMakeBuild
|
||||
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.1.0-dev")
|
||||
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.1.0+x1_patch")
|
||||
cmdclass_dict["build_optl"] = PostInstallCommand
|
||||
|
||||
setup(
|
||||
|
221
test/ce/server/test_base_chat.py
Normal file
221
test/ce/server/test_base_chat.py
Normal file
@@ -0,0 +1,221 @@
|
||||
#!/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# @author DDDivano
|
||||
# encoding=utf-8 vi:ts=4:sw=4:expandtab:ft=python
|
||||
|
||||
"""
|
||||
some basic check for fd web api
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from core import TEMPLATE, URL, build_request_payload, send_request
|
||||
|
||||
|
||||
def test_stream_response():
|
||||
data = {
|
||||
"stream": True,
|
||||
"messages": [
|
||||
{"role": "system", "content": "你是一个知识渊博的 AI 助手"},
|
||||
{"role": "user", "content": "讲讲爱因斯坦的相对论"},
|
||||
],
|
||||
"max_tokens": 10,
|
||||
}
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
resp = send_request(URL, payload, stream=True)
|
||||
|
||||
output = ""
|
||||
for line in resp.iter_lines(decode_unicode=True):
|
||||
if line.strip() == "" or not line.startswith("data: "):
|
||||
continue
|
||||
line = line[len("data: ") :]
|
||||
if line.strip() == "[DONE]":
|
||||
break
|
||||
chunk = json.loads(line)
|
||||
delta = chunk.get("choices", [{}])[0].get("delta", {})
|
||||
output += delta.get("content", "")
|
||||
|
||||
print("Stream输出:", output)
|
||||
assert "相对论" in output or len(output) > 0
|
||||
|
||||
|
||||
def test_system_prompt_effect():
|
||||
data = {
|
||||
"stream": False,
|
||||
"messages": [
|
||||
{"role": "system", "content": "请用一句话回答"},
|
||||
{"role": "user", "content": "什么是人工智能?"},
|
||||
],
|
||||
"max_tokens": 30,
|
||||
}
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
resp = send_request(URL, payload).json()
|
||||
content = resp["choices"][0]["message"]["content"]
|
||||
print("内容输出:", content)
|
||||
assert len(content) < 50
|
||||
|
||||
|
||||
def test_logprobs_enabled():
|
||||
data = {
|
||||
"stream": False,
|
||||
"logprobs": True,
|
||||
"top_logprobs": 5,
|
||||
"messages": [{"role": "user", "content": "非洲的首都是?"}],
|
||||
"max_tokens": 3,
|
||||
}
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
resp = send_request(URL, payload).json()
|
||||
logprob_data = resp["choices"][0].get("logprobs")
|
||||
print("LogProbs:", logprob_data)
|
||||
assert logprob_data is not None
|
||||
content_logprobs = logprob_data.get("content", [])
|
||||
assert isinstance(content_logprobs, list)
|
||||
assert all("token" in item for item in content_logprobs)
|
||||
|
||||
|
||||
def test_stop_sequence():
|
||||
data = {
|
||||
"stream": False,
|
||||
"stop": ["果冻"],
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "你要严格按照我接下来的话输出,输出冒号后面的内容,请输出:这是第一段。果冻这是第二段啦啦啦啦啦。",
|
||||
},
|
||||
],
|
||||
"max_tokens": 20,
|
||||
"top_p": 0,
|
||||
}
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
resp = send_request(URL, payload).json()
|
||||
content = resp["choices"][0]["message"]["content"]
|
||||
print("截断输出:", content)
|
||||
assert "第二段" not in content
|
||||
|
||||
|
||||
def test_sampling_parameters():
|
||||
data = {
|
||||
"stream": False,
|
||||
"temperature": 0,
|
||||
"top_p": 0,
|
||||
"messages": [
|
||||
{"role": "user", "content": "1+1=?,直接回答答案"},
|
||||
],
|
||||
"max_tokens": 50,
|
||||
}
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
resp = send_request(URL, payload).json()
|
||||
answer = resp["choices"][0]["message"]["content"]
|
||||
print("Sampling输出:", answer)
|
||||
assert any(ans in answer for ans in ["2", "二"])
|
||||
|
||||
|
||||
def test_multi_turn_conversation():
|
||||
data = {
|
||||
"stream": False,
|
||||
"messages": [
|
||||
{"role": "user", "content": "牛顿是谁?"},
|
||||
{"role": "assistant", "content": "牛顿是一位物理学家。"},
|
||||
{"role": "user", "content": "他提出了什么理论?"},
|
||||
],
|
||||
"max_tokens": 30,
|
||||
}
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
resp = send_request(URL, payload).json()
|
||||
content = resp["choices"][0]["message"]["content"]
|
||||
print("多轮记忆:", content)
|
||||
assert "三大运动定律" in content or "万有引力" in content
|
||||
|
||||
|
||||
def test_bad_words_filtering():
|
||||
banned_tokens = ["和", "呀"]
|
||||
|
||||
data = {
|
||||
"stream": False,
|
||||
"messages": [
|
||||
{"role": "system", "content": "你是一个助手,回答简洁清楚"},
|
||||
{"role": "user", "content": "请输出冒号后面的字: 我爱吃果冻,和苹果,香蕉,和荔枝"},
|
||||
],
|
||||
"top_p": 0,
|
||||
"max_tokens": 69,
|
||||
"bad_words": banned_tokens,
|
||||
}
|
||||
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
response = send_request(URL, payload).json()
|
||||
|
||||
content = response["choices"][0]["message"]["content"]
|
||||
print("生成内容:", content)
|
||||
|
||||
for word in banned_tokens:
|
||||
assert word not in content, f"bad_word '{word}' 不应出现在生成结果中"
|
||||
|
||||
print("test_bad_words_filtering 通过:生成结果未包含被禁词")
|
||||
|
||||
data = {
|
||||
"stream": False,
|
||||
"messages": [
|
||||
{"role": "system", "content": "你是一个助手,回答简洁清楚"},
|
||||
{"role": "user", "content": "请输出冒号后面的字,一模一样: 我爱吃果冻,苹果,香蕉,和荔枝呀呀呀"},
|
||||
],
|
||||
"top_p": 0,
|
||||
"max_tokens": 69,
|
||||
# "bad_words": banned_tokens,
|
||||
}
|
||||
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
response = send_request(URL, payload).json()
|
||||
|
||||
content = response["choices"][0]["message"]["content"]
|
||||
print("生成内容:", content)
|
||||
|
||||
for word in banned_tokens:
|
||||
assert word not in content, f"bad_word '{word}' 不应出现在生成结果中"
|
||||
|
||||
print("test_bad_words_filtering 通过:生成结果未包含被禁词")
|
||||
|
||||
|
||||
def test_bad_words_filtering1():
|
||||
banned_tokens = ["和", "呀"]
|
||||
|
||||
data = {
|
||||
"stream": False,
|
||||
"messages": [
|
||||
{"role": "system", "content": "你是一个助手,回答简洁清楚"},
|
||||
{"role": "user", "content": "请输出冒号后面的字: 我爱吃果冻,和苹果,香蕉,和荔枝"},
|
||||
],
|
||||
"top_p": 0,
|
||||
"max_tokens": 69,
|
||||
"bad_words": banned_tokens,
|
||||
}
|
||||
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
response = send_request(URL, payload).json()
|
||||
|
||||
content = response["choices"][0]["message"]["content"]
|
||||
print("生成内容:", content)
|
||||
|
||||
for word in banned_tokens:
|
||||
assert word not in content, f"bad_word '{word}' 不应出现在生成结果中"
|
||||
|
||||
print("test_bad_words_filtering 通过:生成结果未包含被禁词")
|
||||
word = "呀呀"
|
||||
data = {
|
||||
"stream": False,
|
||||
"messages": [
|
||||
{"role": "system", "content": "你是一个助手,回答简洁清楚"},
|
||||
{"role": "user", "content": "请输出冒号后面的字,一模一样: 我爱吃果冻,苹果,香蕉,和荔枝呀呀呀"},
|
||||
],
|
||||
"top_p": 0,
|
||||
"max_tokens": 69,
|
||||
}
|
||||
|
||||
payload = build_request_payload(TEMPLATE, data)
|
||||
response = send_request(URL, payload).json()
|
||||
|
||||
content = response["choices"][0]["message"]["content"]
|
||||
print("生成内容:", content)
|
||||
|
||||
assert word in content, f" '{word}' 应出现在生成结果中"
|
||||
|
||||
print("test_bad_words_filtering 通过:生成结果未包含被禁词")
|
78
test/entrypoints/openai/test_build_sample_logprobs.py
Normal file
78
test/entrypoints/openai/test_build_sample_logprobs.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from fastdeploy.entrypoints.llm import LLM
|
||||
from fastdeploy.worker.output import Logprob, LogprobsLists
|
||||
|
||||
|
||||
def get_patch_path(cls, method="__init__"):
|
||||
return f"{cls.__module__}.{cls.__qualname__}.{method}"
|
||||
|
||||
|
||||
class TestBuildSampleLogprobs(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Set up the test environment by creating an instance of the LLM class using Mock.
|
||||
"""
|
||||
patch_llm = get_patch_path(LLM)
|
||||
with patch(patch_llm, return_value=None):
|
||||
self.llm = LLM()
|
||||
# mock d data_processor
|
||||
self.llm.llm_engine = MagicMock()
|
||||
self.llm.llm_engine.data_processor.process_logprob_response.side_effect = (
|
||||
lambda ids, **kwargs: f"token_{ids[0]}"
|
||||
)
|
||||
|
||||
def test_build_sample_logprobs_basic(self):
|
||||
"""
|
||||
Test case for building sample logprobs when `topk_logprobs` is valid.
|
||||
"""
|
||||
logprob_token_ids = [[100, 101, 102]]
|
||||
logprobs = [[-0.1, -0.5, -1.0]]
|
||||
sampled_token_ranks = [0]
|
||||
|
||||
logprobs_lists = LogprobsLists(
|
||||
logprob_token_ids=logprob_token_ids, logprobs=logprobs, sampled_token_ranks=sampled_token_ranks
|
||||
)
|
||||
|
||||
result = self.llm._build_sample_logprobs(logprobs_lists, topk_logprobs=2)
|
||||
|
||||
expected = [
|
||||
{
|
||||
101: Logprob(logprob=-0.5, rank=1, decoded_token="token_101"),
|
||||
102: Logprob(logprob=-1.0, rank=2, decoded_token="token_102"),
|
||||
}
|
||||
]
|
||||
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_build_sample_logprobs_empty_input(self):
|
||||
"""
|
||||
Test case where `logprob_token_ids` is empty.
|
||||
"""
|
||||
logprobs_lists = MagicMock(spec=LogprobsLists)
|
||||
logprobs_lists.logprob_token_ids = []
|
||||
result = self.llm._build_sample_logprobs(logprobs_lists, topk_logprobs=2)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_build_sample_logprobs_invalid_topk(self):
|
||||
"""
|
||||
Test case where `topk` value exceeds length of first element in `logprob_token_ids`.
|
||||
"""
|
||||
logprobs_lists = MagicMock(spec=LogprobsLists)
|
||||
logprobs_lists.logprob_token_ids = [[100]]
|
||||
result = self.llm._build_sample_logprobs(logprobs_lists, topk_logprobs=2)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_token(self):
|
||||
"""
|
||||
Test case for decoding a single token ID.
|
||||
"""
|
||||
token_id = 123
|
||||
decoded = self.llm._decode_token(token_id)
|
||||
self.assertEqual(decoded, "token_123")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
112
test/entrypoints/openai/test_serving_completion.py
Normal file
112
test/entrypoints/openai/test_serving_completion.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import unittest
|
||||
from typing import List
|
||||
from unittest.mock import Mock
|
||||
|
||||
from fastdeploy.entrypoints.openai.serving_completion import (
|
||||
CompletionRequest,
|
||||
OpenAIServingCompletion,
|
||||
RequestOutput,
|
||||
)
|
||||
|
||||
|
||||
class TestOpenAIServingCompletion(unittest.TestCase):
|
||||
|
||||
def test_calc_finish_reason_tool_calls(self):
|
||||
# 创建一个模拟的engine_client,并设置reasoning_parser为"ernie_x1"
|
||||
engine_client = Mock()
|
||||
engine_client.reasoning_parser = "ernie_x1"
|
||||
# 创建一个OpenAIServingCompletion实例
|
||||
serving_completion = OpenAIServingCompletion(engine_client, "pid", "ips", 360)
|
||||
# 创建一个模拟的output,并设置finish_reason为"tool_calls"
|
||||
output = {"finish_reason": "tool_calls"}
|
||||
# 调用calc_finish_reason方法
|
||||
result = serving_completion.calc_finish_reason(None, 100, output)
|
||||
# 断言结果为"tool_calls"
|
||||
assert result == "tool_calls"
|
||||
|
||||
def test_calc_finish_reason_stop(self):
|
||||
# 创建一个模拟的engine_client,并设置reasoning_parser为"ernie_x1"
|
||||
engine_client = Mock()
|
||||
engine_client.reasoning_parser = "ernie_x1"
|
||||
# 创建一个OpenAIServingCompletion实例
|
||||
serving_completion = OpenAIServingCompletion(engine_client, "pid", "ips", 360)
|
||||
# 创建一个模拟的output,并设置finish_reason为其他值
|
||||
output = {"finish_reason": "other_reason"}
|
||||
# 调用calc_finish_reason方法
|
||||
result = serving_completion.calc_finish_reason(None, 100, output)
|
||||
# 断言结果为"stop"
|
||||
assert result == "stop"
|
||||
|
||||
def test_calc_finish_reason_length(self):
|
||||
# 创建一个模拟的engine_client
|
||||
engine_client = Mock()
|
||||
# 创建一个OpenAIServingCompletion实例
|
||||
serving_completion = OpenAIServingCompletion(engine_client, "pid", "ips", 360)
|
||||
# 创建一个模拟的output
|
||||
output = {}
|
||||
# 调用calc_finish_reason方法
|
||||
result = serving_completion.calc_finish_reason(100, 100, output)
|
||||
# 断言结果为"length"
|
||||
assert result == "length"
|
||||
|
||||
def test_request_output_to_completion_response(self):
|
||||
engine_client = Mock()
|
||||
# 创建一个OpenAIServingCompletion实例
|
||||
openai_serving_completion = OpenAIServingCompletion(engine_client, "pid", "ips", 360)
|
||||
final_res_batch: List[RequestOutput] = [
|
||||
{
|
||||
"prompt": "Hello, world!",
|
||||
"outputs": {
|
||||
"token_ids": [1, 2, 3],
|
||||
"text": " world!",
|
||||
"top_logprobs": {
|
||||
"a": 0.1,
|
||||
"b": 0.2,
|
||||
},
|
||||
},
|
||||
"output_token_ids": 3,
|
||||
},
|
||||
{
|
||||
"prompt": "Hello, world!",
|
||||
"outputs": {
|
||||
"token_ids": [4, 5, 6],
|
||||
"text": " world!",
|
||||
"top_logprobs": {
|
||||
"a": 0.3,
|
||||
"b": 0.4,
|
||||
},
|
||||
},
|
||||
"output_token_ids": 3,
|
||||
},
|
||||
]
|
||||
|
||||
request: CompletionRequest = Mock()
|
||||
request_id = "test_request_id"
|
||||
created_time = 1655136000
|
||||
model_name = "test_model"
|
||||
prompt_batched_token_ids = [[1, 2, 3], [4, 5, 6]]
|
||||
completion_batched_token_ids = [[7, 8, 9], [10, 11, 12]]
|
||||
|
||||
completion_response = openai_serving_completion.request_output_to_completion_response(
|
||||
final_res_batch=final_res_batch,
|
||||
request=request,
|
||||
request_id=request_id,
|
||||
created_time=created_time,
|
||||
model_name=model_name,
|
||||
prompt_batched_token_ids=prompt_batched_token_ids,
|
||||
completion_batched_token_ids=completion_batched_token_ids,
|
||||
text_after_process_list=["1", "1"],
|
||||
)
|
||||
|
||||
assert completion_response.id == request_id
|
||||
assert completion_response.created == created_time
|
||||
assert completion_response.model == model_name
|
||||
assert len(completion_response.choices) == 2
|
||||
|
||||
# 验证 choices 的 text 属性
|
||||
assert completion_response.choices[0].text == "Hello, world! world!"
|
||||
assert completion_response.choices[1].text == "Hello, world! world!"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
53
test/input/test_ernie_processor.py
Normal file
53
test/input/test_ernie_processor.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from fastdeploy.input.ernie_processor import ErnieProcessor
|
||||
|
||||
|
||||
class TestErnieProcessorProcessResponseDictStreaming(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# 创建 ErnieProcessor 实例的模拟对象
|
||||
with patch.object(ErnieProcessor, "__init__", return_value=None) as mock_init:
|
||||
self.processor = ErnieProcessor("model_path")
|
||||
mock_init.side_effect = lambda *args, **kwargs: print(f"__init__ called with {args}, {kwargs}")
|
||||
|
||||
# 设置必要的属性
|
||||
self.processor.tokenizer = MagicMock()
|
||||
self.processor.tokenizer.eos_token_id = 1
|
||||
self.processor.decode_status = {}
|
||||
self.processor.tool_parsers = {}
|
||||
|
||||
# 模拟 ids2tokens 方法
|
||||
def mock_ids2tokens(token_ids, task_id):
|
||||
return "delta_text", [2, 3], "previous_texts"
|
||||
|
||||
self.processor.ids2tokens = mock_ids2tokens
|
||||
|
||||
# 模拟推理解析器
|
||||
self.mock_reasoning_parser = MagicMock()
|
||||
self.mock_reasoning_parser.__class__.__name__ = "ErnieX1ReasoningParser"
|
||||
self.mock_reasoning_parser.extract_reasoning_content_streaming.return_value = ("reasoning", "text")
|
||||
self.processor.reasoning_parser = self.mock_reasoning_parser
|
||||
|
||||
# 模拟工具解析器
|
||||
self.mock_tool_parser = MagicMock()
|
||||
self.mock_tool_parser.extract_tool_calls_streaming.return_value = "tool_call"
|
||||
self.mock_tool_parser_obj = MagicMock()
|
||||
self.mock_tool_parser_obj.return_value = self.mock_tool_parser
|
||||
self.processor.tool_parser_obj = self.mock_tool_parser_obj
|
||||
|
||||
def test_process_response_dict_streaming_normal_case(self):
|
||||
"""测试正常情况下的流式响应处理"""
|
||||
# 准备输入
|
||||
response_dict = {"finished": False, "request_id": "req1", "outputs": {"token_ids": [4, 5]}}
|
||||
kwargs = {"enable_thinking": True}
|
||||
|
||||
# 调用方法
|
||||
result = self.processor.process_response_dict_streaming(response_dict, **kwargs)
|
||||
|
||||
# 验证结果
|
||||
self.assertEqual(result["outputs"]["raw_prediction"], "delta_text")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Reference in New Issue
Block a user