diff --git a/.github/workflows/_logprob_test_linux.yml b/.github/workflows/_logprob_test_linux.yml index e55d937df..79f6d47e2 100644 --- a/.github/workflows/_logprob_test_linux.yml +++ b/.github/workflows/_logprob_test_linux.yml @@ -62,18 +62,22 @@ jobs: MODEL_CACHE_DIR: ${{ inputs.MODEL_CACHE_DIR }} run: | runner_name="${{ runner.name }}" - last_char="${runner_name: -1}" + CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}') + DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,) + DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1) - if [[ "$last_char" =~ [0-7] ]]; then - DEVICES="$last_char" - else - DEVICES="0" - fi - - FLASK_PORT=$((9160 + DEVICES * 100)) - FD_API_PORT=$((9180 + DEVICES * 100)) - FD_ENGINE_QUEUE_PORT=$((9150 + DEVICES * 100)) - FD_METRICS_PORT=$((9170 + DEVICES * 100)) + FLASK_PORT=$((42068 + DEVICE_PORT * 100)) + FD_API_PORT=$((42088 + DEVICE_PORT * 100)) + FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100)) + FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100)) + echo "Test ENV Parameter:" + echo "=========================================================" + echo "FLASK_PORT=${FLASK_PORT}" + echo "FD_API_PORT=${FD_API_PORT}" + echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" + echo "FD_METRICS_PORT=${FD_METRICS_PORT}" + echo "DEVICES=${DEVICES}" + echo "=========================================================" CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}" echo "CACHE_DIR is set to ${CACHE_DIR}" @@ -85,7 +89,24 @@ jobs: exit 1 fi - PARENT_DIR=$(dirname "$WORKSPACE") + PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT) + LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log" + echo "==== LOG_FILE is ${LOG_FILE} ====" + + echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE + + for port in "${PORTS[@]}"; do + PIDS=$(lsof -t -i :$port || true) + if [ -n "$PIDS" ]; then + echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE + echo "$PIDS" | xargs -r kill -9 + echo "Port $port cleared" | tee -a $LOG_FILE + else + echo "Port $port is free" | tee -a $LOG_FILE + fi + done + + echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE docker run --ipc=host --pid=host --net=host \ -v $(pwd):/workspace \ diff --git a/.github/workflows/_pre_ce_test.yml b/.github/workflows/_pre_ce_test.yml index 91368dfda..637eeb249 100644 --- a/.github/workflows/_pre_ce_test.yml +++ b/.github/workflows/_pre_ce_test.yml @@ -21,6 +21,11 @@ on: required: false type: string default: "" + MODEL_CACHE_DIR: + description: "Cache Dir Use" + required: false + type: string + default: "" concurrency: group: ${{ github.event.pull_request.number }} @@ -28,7 +33,7 @@ concurrency: jobs: run_ce_cases: - runs-on: [self-hosted, GPU-L20-4Card] + runs-on: [self-hosted, PRE_CE_RUN_2Card] steps: - name: Print current runner name run: | @@ -67,34 +72,64 @@ jobs: env: docker_image: ${{ inputs.DOCKER_IMAGE }} fd_wheel_url: ${{ inputs.FASTDEPLOY_WHEEL_URL }} + CACHE_DIR: ${{ inputs.CACHE_DIR }} + MODEL_CACHE_DIR: ${{ inputs.MODEL_CACHE_DIR }} run: | runner_name="${{ runner.name }}" - last_char="${runner_name: -1}" + CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}') + DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,) + DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1) - if [ "${last_char}" = "1" ]; then - gpu_id=2 - DEVICES="2,3" - else - gpu_id=0 - DEVICES="0,1" + FLASK_PORT=$((42068 + DEVICE_PORT * 100)) + FD_API_PORT=$((42088 + DEVICE_PORT * 100)) + FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100)) + FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100)) + echo "Test ENV Parameter:" + echo "=========================================================" + echo "FLASK_PORT=${FLASK_PORT}" + echo "FD_API_PORT=${FD_API_PORT}" + echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" + echo "FD_METRICS_PORT=${FD_METRICS_PORT}" + echo "DEVICES=${DEVICES}" + echo "=========================================================" + + CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}" + echo "CACHE_DIR is set to ${CACHE_DIR}" + if [ ! -f "${CACHE_DIR}/gitconfig" ]; then + touch "${CACHE_DIR}/gitconfig" fi - FD_API_PORT=$((9180 + gpu_id * 100)) - FD_ENGINE_QUEUE_PORT=$((9150 + gpu_id * 100)) - FD_METRICS_PORT=$((9170 + gpu_id * 100)) - PARENT_DIR=$(dirname "$WORKSPACE") - echo "PARENT_DIR:$PARENT_DIR" + PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT) + LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log" + echo "==== LOG_FILE is ${LOG_FILE} ====" + + echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE + + for port in "${PORTS[@]}"; do + PIDS=$(lsof -t -i :$port || true) + if [ -n "$PIDS" ]; then + echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE + echo "$PIDS" | xargs -r kill -9 + echo "Port $port cleared" | tee -a $LOG_FILE + else + echo "Port $port is free" | tee -a $LOG_FILE + fi + done + + echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE + docker run --rm --net=host -v $(pwd):/workspace -w /workspace \ - -v "/ssd4/GithubActions/gitconfig:/etc/gitconfig:ro" \ - -v "/ssd4/GithubActions/ModelData:/ModelData:ro" \ - -v "/ssd4/GithubActions/CacheDir:/root/.cache" \ - -v "/ssd4/GithubActions/ConfigDir:/root/.config" \ + -v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \ + -v "${CACHE_DIR}/.cache:/root/.cache" \ + -v "${CACHE_DIR}/ConfigDir:/root/.config" \ + -v "${MODEL_CACHE_DIR}:/ModelData:ro" \ -e "MODEL_PATH=/ModelData" \ -e "FD_API_PORT=${FD_API_PORT}" \ -e "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" \ -e "FD_METRICS_PORT=${FD_METRICS_PORT}" \ + -e "FLASK_PORT=${FLASK_PORT}" \ -e "fd_wheel_url=${fd_wheel_url}" \ - --gpus '"device='"${DEVICES}"'"' ${docker_image} /bin/bash -c ' + --gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c ' git config --global --add safe.directory /workspace/FastDeploy cd FastDeploy python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/ diff --git a/.github/workflows/_unit_test_coverage.yml b/.github/workflows/_unit_test_coverage.yml index b959fff17..a29edb0aa 100644 --- a/.github/workflows/_unit_test_coverage.yml +++ b/.github/workflows/_unit_test_coverage.yml @@ -22,6 +22,11 @@ on: required: false type: string default: "" + MODEL_CACHE_DIR: + description: "Cache Dir Use" + required: false + type: string + default: "" jobs: run_tests_with_coverage: @@ -67,56 +72,102 @@ jobs: fd_wheel_url: ${{ inputs.FASTDEPLOY_WHEEL_URL }} CACHE_DIR: ${{ inputs.CACHE_DIR }} BASE_REF: ${{ github.event.pull_request.base.ref }} + MODEL_CACHE_DIR: ${{ inputs.MODEL_CACHE_DIR }} run: | - set -x - runner_name="${{ runner.name }}" - CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}') - gpu_id=$(echo "$CARD_ID" | fold -w1 | paste -sd,) + set -x + runner_name="${{ runner.name }}" + CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}') + DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,) + DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1) - CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}" - echo "CACHE_DIR is set to ${CACHE_DIR}" - if [ ! -f "${CACHE_DIR}/gitconfig" ]; then - touch "${CACHE_DIR}/gitconfig" - fi - PARENT_DIR=$(dirname "$WORKSPACE") - echo "PARENT_DIR:$PARENT_DIR" - docker run --rm --net=host \ - --cap-add=SYS_PTRACE --privileged --shm-size=64G \ - -v $(pwd):/workspace -w /workspace \ - -v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \ - -v "${CACHE_DIR}/.cache:/root/.cache" \ - -v "${CACHE_DIR}/ConfigDir:/root/.config" \ - -e TZ="Asia/Shanghai" \ - -e "fd_wheel_url=${fd_wheel_url}" \ - -e "BASE_REF=${BASE_REF}" \ - --gpus "\"device=${gpu_id}\"" ${docker_image} /bin/bash -c ' + FLASK_PORT=$((42068 + DEVICE_PORT * 100)) + FD_API_PORT=$((42088 + DEVICE_PORT * 100)) + FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100)) + FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100)) + echo "Test ENV Parameter:" + echo "=========================================================" + echo "FLASK_PORT=${FLASK_PORT}" + echo "FD_API_PORT=${FD_API_PORT}" + echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" + echo "FD_METRICS_PORT=${FD_METRICS_PORT}" + echo "DEVICES=${DEVICES}" + echo "=========================================================" - git config --global --add safe.directory /workspace/FastDeploy - cd FastDeploy - python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/ + CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}" + echo "CACHE_DIR is set to ${CACHE_DIR}" + if [ ! -f "${CACHE_DIR}/gitconfig" ]; then + touch "${CACHE_DIR}/gitconfig" + fi - pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple + PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT) + LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log" + echo "==== LOG_FILE is ${LOG_FILE} ====" + echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE + + for port in "${PORTS[@]}"; do + PIDS=$(lsof -t -i :$port || true) + if [ -n "$PIDS" ]; then + echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE + echo "$PIDS" | xargs -r kill -9 + echo "Port $port cleared" | tee -a $LOG_FILE + else + echo "Port $port is free" | tee -a $LOG_FILE + fi + done + + echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE + + docker run --rm --net=host \ + --cap-add=SYS_PTRACE --shm-size=64G \ + -v $(pwd):/workspace -w /workspace \ + -v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \ + -v "${CACHE_DIR}/.cache:/root/.cache" \ + -v "${CACHE_DIR}/ConfigDir:/root/.config" \ + -v "${MODEL_CACHE_DIR}:/ModelData:ro" \ + -e "MODEL_PATH=/ModelData" \ + -e "FD_API_PORT=${FD_API_PORT}" \ + -e "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" \ + -e "FD_METRICS_PORT=${FD_METRICS_PORT}" \ + -e "FLASK_PORT=${FLASK_PORT}" \ + -e TZ="Asia/Shanghai" \ + -e "fd_wheel_url=${fd_wheel_url}" \ + -e "BASE_REF=${BASE_REF}" \ + --gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c ' + + git config --global --add safe.directory /workspace/FastDeploy + cd FastDeploy + python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/ + + pip config set global.extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple + + python -m pip install coverage + python -m pip install diff-cover + python -m pip install ${fd_wheel_url} + if [ -d "test/plugins" ]; then + cd test/plugins + python setup.py install + cd ../.. + else + echo "Warning: test/plugins directory not found, skipping setup.py install" + fi + export COVERAGE_FILE=/workspace/FastDeploy/coveragedata/.coverage + export COVERAGE_RCFILE=/workspace/FastDeploy/scripts/.coveragerc + TEST_EXIT_CODE=0 + bash scripts/coverage_run.sh || TEST_EXIT_CODE=8 + git diff origin/${BASE_REF}..HEAD --unified=0 > diff.txt + echo "TEST_EXIT_CODE=${TEST_EXIT_CODE}" >> exit_code.env + coverage combine coveragedata/ + coverage xml -o python_coverage_all.xml + COVERAGE_EXIT_CODE=0 + diff-cover python_coverage_all.xml --diff-file=diff.txt --fail-under=80 --json-report diff_coverage.json || COVERAGE_EXIT_CODE=9 + echo "COVERAGE_EXIT_CODE=${COVERAGE_EXIT_CODE}" >> exit_code.env + python scripts/generate_diff_coverage_xml.py diff.txt python_coverage_all.xml + ' + if [ -f FastDeploy/exit_code.env ]; then + cat FastDeploy/exit_code.env >> $GITHUB_ENV + fi - python -m pip install coverage - python -m pip install diff-cover - python -m pip install ${fd_wheel_url} - export COVERAGE_FILE=/workspace/FastDeploy/coveragedata/.coverage - export COVERAGE_RCFILE=/workspace/FastDeploy/scripts/.coveragerc - TEST_EXIT_CODE=0 - bash scripts/coverage_run.sh || TEST_EXIT_CODE=8 - git diff origin/${BASE_REF}..HEAD --unified=0 > diff.txt - echo "TEST_EXIT_CODE=${TEST_EXIT_CODE}" >> exit_code.env - coverage combine coveragedata/ - coverage xml -o python_coverage_all.xml - COVERAGE_EXIT_CODE=0 - diff-cover python_coverage_all.xml --diff-file=diff.txt --fail-under=80 --json-report diff_coverage.json || COVERAGE_EXIT_CODE=9 - echo "COVERAGE_EXIT_CODE=${COVERAGE_EXIT_CODE}" >> exit_code.env - python scripts/generate_diff_coverage_xml.py diff.txt python_coverage_all.xml - ' - if [ -f FastDeploy/exit_code.env ]; then - cat FastDeploy/exit_code.env >> $GITHUB_ENV - fi - name: Upload unit resule and diff coverage to bos id: cov_upload shell: bash @@ -152,32 +203,40 @@ jobs: echo "unittest_failed_url=${UNIT_TEST_RESULT_URL}" >> $GITHUB_OUTPUT echo "unittest_failed_url=${UNIT_TEST_RESULT_URL}" >> $GITHUB_ENV fi - - name: Determine Unit Succ and whether the coverage rate reaches 80% + - name: Check Unit Test Success shell: bash run: | + cd FastDeploy if [ "$TEST_EXIT_CODE" -eq 8 ]; then + filename=$(basename "$unittest_failed_url") if [ -z "${unittest_failed_url}" ]; then echo "No diff unit failed file URL provided." else - wget ${unittest_failed_url} || echo "Download unittest file failed, but continuing..." + rm -rf "${filename}" + wget -O ${filename} ${unittest_failed_url} || echo "Download unittest file failed, but continuing..." fi echo "Unit tests failed (exit code 8)" - filename=$(basename "$unittest_failed_url") if [ -f "${filename}" ];then echo "Failed test cases:" cat "${filename}" fi exit "$TEST_EXIT_CODE" fi + echo "All tests passed" + - name: Verify Code Coverage Threshold (80%) + shell: bash + run: | + cd FastDeploy if [ "$COVERAGE_EXIT_CODE" -eq 9 ]; then echo "Coverage generation failed (exit code 9)" + filename=$(basename "$diff_cov_result_json_url") if [ -z "${diff_cov_result_json_url}" ]; then echo "No diff cov result file URL provided." else - wget ${diff_cov_result_json_url} || echo "Download cov json file failed, but continuing..." + rm -rf "${filename}" + wget -O ${filename} ${diff_cov_result_json_url} || echo "Download cov json file failed, but continuing..." fi - filename=$(basename "$diff_cov_result_json_url") if [ -f "${filename}" ];then echo "Failed test cases:" if command -v jq >/dev/null 2>&1; then @@ -188,7 +247,7 @@ jobs: fi exit "$COVERAGE_EXIT_CODE" fi - echo "All tests and coverage passed" + echo "coverage passed" exit 0 diff_coverage_report: diff --git a/.github/workflows/pr_build_and_test.yml b/.github/workflows/pr_build_and_test.yml index 0123e5a55..73abc2440 100644 --- a/.github/workflows/pr_build_and_test.yml +++ b/.github/workflows/pr_build_and_test.yml @@ -39,25 +39,27 @@ jobs: needs: [clone,build] uses: ./.github/workflows/_unit_test_coverage.yml with: - DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:cuda126-py310 + DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-dailyupdate FASTDEPLOY_ARCHIVE_URL: ${{ needs.clone.outputs.repo_archive_url }} FASTDEPLOY_WHEEL_URL: ${{ needs.build.outputs.wheel_path }} + MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelData" logprob_test: name: Run FastDeploy LogProb Tests needs: [build] uses: ./.github/workflows/_logprob_test_linux.yml with: - DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:cuda126-py310 + DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-dailyupdate PADDLETEST_ARCHIVE_URL: "https://xly-devops.bj.bcebos.com/PaddleTest/PaddleTest.tar.gz" FASTDEPLOY_WHEEL_URL: ${{ needs.build.outputs.wheel_path }} - MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelCache" + MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelData" pre_ce_test: name: Extracted partial CE model tasks to run in CI. needs: [clone,build] uses: ./.github/workflows/_pre_ce_test.yml with: - DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddle:fastdeploy-ciuse-cuda126 + DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-dailyupdate FASTDEPLOY_ARCHIVE_URL: ${{ needs.clone.outputs.repo_archive_url }} FASTDEPLOY_WHEEL_URL: ${{ needs.build.outputs.wheel_path }} + MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelData" diff --git a/fastdeploy/__init__.py b/fastdeploy/__init__.py index cc26ff07a..836780ea4 100644 --- a/fastdeploy/__init__.py +++ b/fastdeploy/__init__.py @@ -24,7 +24,11 @@ os.environ["GLOG_minloglevel"] = "2" os.environ["AISTUDIO_LOG"] = "critical" from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.llm import LLM -from fastdeploy.utils import version +from fastdeploy.utils import version, envs +from paddleformers.utils.log import logger as pf_logger +if envs.FD_DEBUG != "1": + import logging + pf_logger.logger.setLevel(logging.INFO) __all__ = ["LLM", "SamplingParams", "version"] diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 5ac7e6ab6..16a89932d 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -536,7 +536,7 @@ class LLMEngine: max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM) if len(stop_seqs_len) > max_stop_seqs_num: error_msg = ( - f"Length of stop ({stop_seqs_len}) exceeds the limit max_model_len({max_stop_seqs_num})." + f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})." "Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`" ) llm_logger.error(error_msg) @@ -545,7 +545,7 @@ class LLMEngine: for single_stop_seq_len in stop_seqs_len: if single_stop_seq_len > stop_seqs_max_len: error_msg = ( - f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit max_model_len({stop_seqs_max_len})." + f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})." "Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`" ) llm_logger.error(error_msg) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index d1116980c..4aecabcd5 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -97,13 +97,13 @@ class ResourceManagerV1(ResourceManager): def _prepare_preempt_task(self, request): return ScheduledPreemptTask(idx=request.idx, request_id=request.request_id) - + def reschedule_preempt_task(self, request_id): with self.lock: if request_id in self.to_be_rescheduled_request_id_set and request_id in self.requests: request = self.requests[request_id] self.waiting.appendleft(request) - self.to_be_rescheduled_request_id_set.remove(request_id) + self.to_be_rescheduled_request_id_set.remove(request_id) def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_reqs): can_schedule = True @@ -142,26 +142,31 @@ class ResourceManagerV1(ResourceManager): input_ids_lst = request.prompt_token_ids + request.output_token_ids input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") - grid_thw = [] - for one in inputs["grid_thw"]: - if one[0] == 1: - grid_thw.append(one) - else: - grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2)) - + input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") image_patch_id = inputs["image_patch_id"] - grid_thw = paddle.to_tensor(grid_thw, dtype="int64") + if request.multimodal_img_boundaries is None: + grid_thw = [] + for one in inputs["grid_thw"]: + if one[0] == 1: + grid_thw.append(one) + else: + grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2)) + + grid_thw = paddle.to_tensor(grid_thw, dtype="int64") from fastdeploy.model_executor.ops.gpu import get_img_boundaries request.multimodal_img_boundaries = get_img_boundaries( task_input_ids=input_ids, grid_thw=grid_thw, image_patch_id=image_patch_id ).numpy() + grid_thw = grid_thw.numpy().reshape([-1, 3]) + inputs["grid_thw"] = grid_thw + + grid_thw = inputs["grid_thw"] img_boundaries_idx = request.multimodal_img_boundaries[0] img_num_per_boundary = request.multimodal_img_boundaries[1] ori_prompt_len = img_boundaries_idx[-1].item() - grid_thw = grid_thw.numpy().reshape([-1, 3]) pre_end_idx = request.num_computed_tokens new_end_idx = pre_end_idx + num_new_tokens if new_end_idx < ori_prompt_len and input_ids[new_end_idx - 1] == image_patch_id: @@ -421,9 +426,15 @@ class ResourceManagerV1(ResourceManager): self.running.remove(request) request.status = RequestStatus.FINISHED self._free_blocks(request) - if request.request_id in self.to_be_rescheduled_request_id_set: # finished after preempted, blocks have been recycled. - self.to_be_rescheduled_request_id_set.remove(request.request_id) # just remove from to_be_rescheduled_request_id_set - if request in self.waiting: # after finished, this request still scheduled from preempted to waiting, unexpected error, should not be here + if ( + request.request_id in self.to_be_rescheduled_request_id_set + ): # finished after preempted, blocks have been recycled. + self.to_be_rescheduled_request_id_set.remove( + request.request_id + ) # just remove from to_be_rescheduled_request_id_set + if ( + request in self.waiting + ): # after finished, this request still scheduled from preempted to waiting, unexpected error, should not be here raise RuntimeError(f"request {request.request_id} scheduled into waiting list, after finished") self.tasks_list[request.idx] = None self.stop_flags[request.idx] = True diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index a1c787162..e7edacb26 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -154,7 +154,7 @@ class EngineClient: max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM) if len(stop_seqs_len) > max_stop_seqs_num: error_msg = ( - f"Length of stop ({stop_seqs_len}) exceeds the limit max_model_len({max_stop_seqs_num})." + f"Length of stop ({stop_seqs_len}) exceeds the limit max_stop_seqs_num({max_stop_seqs_num})." "Please reduce the number of stop or set a lager max_stop_seqs_num by `FD_MAX_STOP_SEQS_NUM`" ) api_server_logger.error(error_msg) @@ -163,7 +163,7 @@ class EngineClient: for single_stop_seq_len in stop_seqs_len: if single_stop_seq_len > stop_seqs_max_len: error_msg = ( - f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit max_model_len({stop_seqs_max_len})." + f"Length of stop_seqs({single_stop_seq_len}) exceeds the limit stop_seqs_max_len({stop_seqs_max_len})." "Please reduce the length of stop sequences or set a larger stop_seqs_max_len by `FD_STOP_SEQS_MAX_LEN`" ) api_server_logger.error(error_msg) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index dd2c1fb69..53168abc0 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -168,9 +168,9 @@ async def connection_manager(): yield except asyncio.TimeoutError: api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}") - if connection_semaphore.locked(): - connection_semaphore.release() - raise HTTPException(status_code=429, detail="Too many requests") + raise HTTPException( + status_code=429, detail=f"Too many requests, current max concurrency is {args.max_concurrency}" + ) def wrap_streaming_generator(original_generator: AsyncGenerator): @@ -183,7 +183,7 @@ def wrap_streaming_generator(original_generator: AsyncGenerator): async for chunk in original_generator: yield chunk finally: - api_server_logger.debug(f"release: {connection_semaphore.status()}") + api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}") connection_semaphore.release() return wrapped_generator @@ -247,6 +247,7 @@ async def create_chat_completion(request: ChatCompletionRequest): """ Create a chat completion for the provided prompt and parameters. """ + api_server_logger.info(f"Chat Received request: {request.model_dump_json()}") if app.state.dynamic_load_weight: status, msg = app.state.engine_client.is_workers_alive() if not status: @@ -257,9 +258,11 @@ async def create_chat_completion(request: ChatCompletionRequest): generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): connection_semaphore.release() + api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}") return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code) elif isinstance(generator, ChatCompletionResponse): connection_semaphore.release() + api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}") return JSONResponse(content=generator.model_dump()) else: wrapped_generator = wrap_streaming_generator(generator) @@ -275,6 +278,7 @@ async def create_completion(request: CompletionRequest): """ Create a completion for the provided prompt and parameters. """ + api_server_logger.info(f"Completion Received request: {request.model_dump_json()}") if app.state.dynamic_load_weight: status, msg = app.state.engine_client.is_workers_alive() if not status: diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 0ee0a3423..57b5945e3 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -78,45 +78,45 @@ class OpenAIServingChat: api_server_logger.error(err_msg) return ErrorResponse(message=err_msg, code=400) - if request.user is not None: - request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}" - else: - request_id = f"chatcmpl-{uuid.uuid4()}" - api_server_logger.info(f"create chat completion request: {request_id}") - text_after_process = None try: - current_req_dict = request.to_dict_for_infer(request_id) - current_req_dict["arrival_time"] = time.time() - prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) - text_after_process = current_req_dict.get("text_after_process") - if isinstance(prompt_token_ids, np.ndarray): - prompt_token_ids = prompt_token_ids.tolist() - except Exception as e: - return ErrorResponse(code=400, message=str(e)) - - del current_req_dict - - try: - api_server_logger.debug(f"{self.engine_client.semaphore.status()}") if self.max_waiting_time < 0: await self.engine_client.semaphore.acquire() else: await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + api_server_logger.debug(f"current waiting request {self.engine_client.semaphore.status()}") - if request.stream: - return self.chat_completion_stream_generator( - request, request_id, request.model, prompt_token_ids, text_after_process - ) - else: + if request.user is not None: + request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}" + else: + request_id = f"chatcmpl-{uuid.uuid4()}" + api_server_logger.info(f"create chat completion request: {request_id}") + text_after_process = None try: - return await self.chat_completion_full_generator( - request, request_id, request.model, prompt_token_ids, text_after_process - ) + current_req_dict = request.to_dict_for_infer(request_id) + current_req_dict["arrival_time"] = time.time() + prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) + text_after_process = current_req_dict.get("text_after_process") + if isinstance(prompt_token_ids, np.ndarray): + prompt_token_ids = prompt_token_ids.tolist() except Exception as e: return ErrorResponse(code=400, message=str(e)) + del current_req_dict + + if request.stream: + return self.chat_completion_stream_generator( + request, request_id, request.model, prompt_token_ids, text_after_process + ) + else: + try: + return await self.chat_completion_full_generator( + request, request_id, request.model, prompt_token_ids, text_after_process + ) + except Exception as e: + return ErrorResponse(code=400, message=str(e)) + except Exception: + return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + def _create_streaming_error_response(self, message: str) -> str: error_response = ErrorResponse( code=400, @@ -240,6 +240,7 @@ class OpenAIServingChat: prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens), ) yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n" + api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}") first_iteration = False output = res["outputs"] @@ -274,6 +275,7 @@ class OpenAIServingChat: logprobs=logprobs_res, arrival_time=arrival_time, ) + if res["finished"]: num_choices -= 1 work_process_metrics.e2e_request_latency.observe( @@ -305,6 +307,9 @@ class OpenAIServingChat: if len(choices) == max_streaming_response_tokens or res["finished"]: chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" + # 打印尾包 + if res["finished"]: + api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") choices = [] if choices: @@ -458,13 +463,15 @@ class OpenAIServingChat: prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)), ) work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"]) - return ChatCompletionResponse( + res = ChatCompletionResponse( id=request_id, created=created_time, model=model_name, choices=choices, usage=usage, ) + api_server_logger.info(f"Chat response: {res.model_dump_json()}") + return res def _create_chat_logprobs( self, diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index bec869699..1e8ad0f86 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -101,6 +101,13 @@ class OpenAIServingCompletion: api_server_logger.info(f"start inference for request {num_choices}") prompt_batched_token_ids = [] text_after_process_list = [] + try: + if self.max_waiting_time < 0: + await self.engine_client.semaphore.acquire() + else: + await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) + except Exception: + return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") try: for idx, prompt in enumerate(request_prompts): request_id_idx = f"{request_id}-{idx}" @@ -117,14 +124,6 @@ class OpenAIServingCompletion: del current_req_dict - try: - if self.max_waiting_time < 0: - await self.engine_client.semaphore.acquire() - else: - await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") - if request.stream: return self.completion_stream_generator( request=request, @@ -221,8 +220,7 @@ class OpenAIServingCompletion: valid_results[rid] = data num_choices -= 1 break - - return self.request_output_to_completion_response( + res = self.request_output_to_completion_response( final_res_batch=valid_results, request=request, request_id=request_id, @@ -232,6 +230,8 @@ class OpenAIServingCompletion: completion_batched_token_ids=completion_batched_token_ids, text_after_process_list=text_after_process_list, ) + api_server_logger.info(f"Completion response: {res.model_dump_json()}") + return res except Exception as e: api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True) raise @@ -323,6 +323,9 @@ class OpenAIServingCompletion: ], ) yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" + api_server_logger.info( + f"Completion Streaming response send_idx 0: {chunk.model_dump_json()}" + ) first_iteration[idx] = False self.engine_client.data_processor.process_response_dict( @@ -376,6 +379,15 @@ class OpenAIServingCompletion: choices[-1].finish_reason = self.calc_finish_reason( request.max_tokens, output_tokens[idx], output, tool_called ) + send_idx = output.get("send_idx") + # 只有当 send_idx 明确为 0 时才记录日志 + if send_idx == 0 and not request.return_token_ids: + chunk_temp = chunk + chunk_temp.choices = choices + api_server_logger.info( + f"Completion Streaming response send_idx 0: {chunk_temp.model_dump_json()}" + ) + del chunk_temp if len(choices) == max_streaming_response_tokens or res["finished"]: chunk = CompletionStreamResponse( @@ -401,6 +413,7 @@ class OpenAIServingCompletion: ), ) yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n" + api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}") if choices: chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" diff --git a/fastdeploy/input/ernie_tokenizer.py b/fastdeploy/input/ernie_tokenizer.py index 2bbc798c5..057559015 100644 --- a/fastdeploy/input/ernie_tokenizer.py +++ b/fastdeploy/input/ernie_tokenizer.py @@ -14,8 +14,6 @@ # limitations under the License. """ -# cipher_token=WjI1fQOvhN # do not edit this line - import os import re from shutil import copyfile diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 05e55929d..5a9b6418d 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -67,6 +67,7 @@ class ZmqClient: """ self.router = self.context.socket(zmq.ROUTER) self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM) + self.router.setsockopt(zmq.ROUTER_MANDATORY, 1) self.router.setsockopt(zmq.SNDTIMEO, -1) self.router.bind(f"ipc://{self.router_path}") @@ -111,7 +112,6 @@ class ZmqClient: """ if self.router is None: raise RuntimeError("Router socket not created. Call create_router() first.") - while self.running: with self.mutex: if req_id not in self.req_dict: @@ -124,7 +124,11 @@ class ZmqClient: continue else: break - + if self.req_dict[req_id] == -1: + if data[-1].finished: + with self.mutex: + self.req_dict.pop(req_id, None) + return try: start_send = time.time() if self.aggregate_send: @@ -133,7 +137,9 @@ class ZmqClient: result = msgpack.packb([response.to_dict() for response in data]) self.router.send_multipart([self.req_dict[req_id], b"", result]) llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}") - + except zmq.ZMQError as e: + llm_logger.error(f"[{req_id}] zmq error: {e}") + self.req_dict[req_id] = -1 except Exception as e: llm_logger.error(f"Send result to zmq client failed: {e}") diff --git a/fastdeploy/model_executor/layers/activation.py b/fastdeploy/model_executor/layers/activation.py index 977a4f2f4..04476a590 100644 --- a/fastdeploy/model_executor/layers/activation.py +++ b/fastdeploy/model_executor/layers/activation.py @@ -14,7 +14,6 @@ # limitations under the License. """ -# cipher_token=WjI1fQOvhN # do not edit this line from typing import Optional import paddle diff --git a/scripts/coverage_run.sh b/scripts/coverage_run.sh index 6b6cbbf85..443f2e1c3 100644 --- a/scripts/coverage_run.sh +++ b/scripts/coverage_run.sh @@ -66,11 +66,25 @@ for dir in "${dirs[@]}"; do echo "Skipping disabled test: $test_file" continue fi - - python -m coverage run "$test_file" + # TODO: Add a framework to manage unit test execution time + timeout 600 python -m coverage run "$test_file" if [ $? -ne 0 ]; then echo "$test_file" >> "$failed_tests_file" fail=$((fail + 1)) + + PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT) + echo "==== PORT CLEAN AFTER UT FAILED ====" + + for port in "${PORTS[@]}"; do + PIDS=$(lsof -t -i :$port) + if [ -n "$PIDS" ]; then + echo "Port $port is occupied by PID(s): $PIDS" + echo "$PIDS" | xargs -r kill -9 + echo "Port $port cleared" + else + echo "Port $port is free" + fi + done else success=$((success + 1)) fi diff --git a/test/entrypoints/openai/test_serving_completion.py b/test/entrypoints/openai/test_serving_completion.py index 8d1a4eb66..4c7404a79 100644 --- a/test/entrypoints/openai/test_serving_completion.py +++ b/test/entrypoints/openai/test_serving_completion.py @@ -95,6 +95,7 @@ class TestOpenAIServingCompletion(unittest.TestCase): model_name=model_name, prompt_batched_token_ids=prompt_batched_token_ids, completion_batched_token_ids=completion_batched_token_ids, + text_after_process_list=["1", "1"], ) assert completion_response.id == request_id