From fb18f3092d70b786a2b2400abda3fcb594988eed Mon Sep 17 00:00:00 2001 From: jiangjiajun Date: Mon, 9 Jun 2025 20:26:53 +0800 Subject: [PATCH] [LLM] Add output module and polish docs --- .clang-format | 180 ---------------- .clang_format.hook | 15 -- .cpplint_pre_commit.hook | 60 ------ .gitignore | 225 ++++++++++++++------ .pre-commit-config.yaml | 91 ++++---- README.md | 20 +- fastdeploy/output/__init__.py | 15 ++ fastdeploy/output/token_processor.py | 306 +++++++++++++++++++++++++++ 8 files changed, 548 insertions(+), 364 deletions(-) delete mode 100644 .clang-format delete mode 100644 .clang_format.hook delete mode 100644 .cpplint_pre_commit.hook create mode 100644 fastdeploy/output/__init__.py create mode 100644 fastdeploy/output/token_processor.py diff --git a/.clang-format b/.clang-format deleted file mode 100644 index 4ea70510d..000000000 --- a/.clang-format +++ /dev/null @@ -1,180 +0,0 @@ ---- -Language: Cpp -# BasedOnStyle: LLVM -AccessModifierOffset: -1 -AlignAfterOpenBracket: Align -AlignArrayOfStructures: None -AlignConsecutiveMacros: None -AlignConsecutiveAssignments: None -AlignConsecutiveBitFields: None -AlignConsecutiveDeclarations: None -AlignEscapedNewlines: Right -AlignOperands: Align -AlignTrailingComments: true -AllowAllArgumentsOnNextLine: true -AllowAllConstructorInitializersOnNextLine: true -AllowAllParametersOfDeclarationOnNextLine: true -AllowShortEnumsOnASingleLine: true -AllowShortBlocksOnASingleLine: Never -AllowShortCaseLabelsOnASingleLine: false -AllowShortFunctionsOnASingleLine: All -AllowShortLambdasOnASingleLine: All -AllowShortIfStatementsOnASingleLine: Never -AllowShortLoopsOnASingleLine: false -AlwaysBreakAfterDefinitionReturnType: None -AlwaysBreakAfterReturnType: None -AlwaysBreakBeforeMultilineStrings: false -AlwaysBreakTemplateDeclarations: MultiLine -AttributeMacros: - - __capability -BinPackArguments: true -BinPackParameters: true -BraceWrapping: - AfterCaseLabel: false - AfterClass: false - AfterControlStatement: Never - AfterEnum: false - AfterFunction: false - AfterNamespace: false - AfterObjCDeclaration: false - AfterStruct: false - AfterUnion: false - AfterExternBlock: false - BeforeCatch: false - BeforeElse: false - BeforeLambdaBody: false - BeforeWhile: false - IndentBraces: false - SplitEmptyFunction: true - SplitEmptyRecord: true - SplitEmptyNamespace: true -BreakBeforeBinaryOperators: None -BreakBeforeConceptDeclarations: true -BreakBeforeBraces: Attach -BreakBeforeInheritanceComma: false -BreakInheritanceList: BeforeColon -BreakBeforeTernaryOperators: true -BreakConstructorInitializersBeforeComma: false -BreakConstructorInitializers: BeforeColon -BreakAfterJavaFieldAnnotations: false -BreakStringLiterals: true -ColumnLimit: 80 -# CommentPragmas: '^ IWYU pragma:' -# CommentPragmas: '^[^ ]' -CommentPragmas: '^\\.+' -CompactNamespaces: false -ConstructorInitializerAllOnOneLineOrOnePerLine: false -ConstructorInitializerIndentWidth: 4 -ContinuationIndentWidth: 4 -Cpp11BracedListStyle: true -DeriveLineEnding: true -DerivePointerAlignment: false -DisableFormat: false -EmptyLineAfterAccessModifier: Never -EmptyLineBeforeAccessModifier: LogicalBlock -ExperimentalAutoDetectBinPacking: false -FixNamespaceComments: true -ForEachMacros: - - foreach - - Q_FOREACH - - BOOST_FOREACH -IfMacros: - - KJ_IF_MAYBE -IncludeBlocks: Preserve -IncludeCategories: - - Regex: '^"(llvm|llvm-c|clang|clang-c)/' - Priority: 2 - SortPriority: 0 - CaseSensitive: false - - Regex: '^(<|"(gtest|gmock|isl|json)/)' - Priority: 3 - SortPriority: 0 - CaseSensitive: false - - Regex: '.*' - Priority: 1 - SortPriority: 0 - CaseSensitive: false -IncludeIsMainRegex: '(Test)?$' -IncludeIsMainSourceRegex: '' -IndentAccessModifiers: false -IndentCaseLabels: false -IndentCaseBlocks: false -IndentGotoLabels: true -IndentPPDirectives: None -IndentExternBlock: AfterExternBlock -IndentRequires: false -IndentWidth: 2 -IndentWrappedFunctionNames: false -InsertTrailingCommas: None -JavaScriptQuotes: Leave -JavaScriptWrapImports: true -KeepEmptyLinesAtTheStartOfBlocks: true -LambdaBodyIndentation: Signature -MacroBlockBegin: '' -MacroBlockEnd: '' -MaxEmptyLinesToKeep: 1 -NamespaceIndentation: None -ObjCBinPackProtocolList: Auto -ObjCBlockIndentWidth: 2 -ObjCBreakBeforeNestedBlockParam: true -ObjCSpaceAfterProperty: false -ObjCSpaceBeforeProtocolList: true -PenaltyBreakAssignment: 2 -PenaltyBreakBeforeFirstCallParameter: 19 -PenaltyBreakComment: 300 -PenaltyBreakFirstLessLess: 120 -PenaltyBreakString: 1000 -PenaltyBreakTemplateDeclaration: 10 -PenaltyExcessCharacter: 1000000 -PenaltyReturnTypeOnItsOwnLine: 60 -PenaltyIndentedWhitespace: 0 -PointerAlignment: Left -PPIndentWidth: -1 -ReferenceAlignment: Pointer -ReflowComments: false -ShortNamespaceLines: 1 -SortIncludes: CaseSensitive -SortJavaStaticImport: Before -SortUsingDeclarations: true -SpaceAfterCStyleCast: false -SpaceAfterLogicalNot: false -SpaceAfterTemplateKeyword: true -SpaceBeforeAssignmentOperators: true -SpaceBeforeCaseColon: false -SpaceBeforeCpp11BracedList: false -SpaceBeforeCtorInitializerColon: true -SpaceBeforeInheritanceColon: true -SpaceBeforeParens: ControlStatements -SpaceAroundPointerQualifiers: Default -SpaceBeforeRangeBasedForLoopColon: true -SpaceInEmptyBlock: false -SpaceInEmptyParentheses: false -SpacesBeforeTrailingComments: 2 -SpacesInAngles: Never -SpacesInConditionalStatement: false -SpacesInContainerLiterals: true -SpacesInCStyleCastParentheses: false -SpacesInLineCommentPrefix: - Minimum: 1 - Maximum: -1 -SpacesInParentheses: false -SpacesInSquareBrackets: false -SpaceBeforeSquareBrackets: false -BitFieldColonSpacing: Both -Standard: Latest -StatementAttributeLikeMacros: - - Q_EMIT -StatementMacros: - - Q_UNUSED - - QT_REQUIRE_VERSION -TabWidth: 8 -UseCRLF: false -UseTab: Never -WhitespaceSensitiveMacros: - - STRINGIZE - - PP_STRINGIZE - - BOOST_PP_STRINGIZE - - NS_SWIFT_NAME - - CF_SWIFT_NAME -... - diff --git a/.clang_format.hook b/.clang_format.hook deleted file mode 100644 index df453cbc7..000000000 --- a/.clang_format.hook +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -set -e - -readonly VERSION="3.8" - -version=$(clang-format -version) - -if ! [[ version==∗"VERSION"* ]]; then - echo "clang-format version check failed." - echo "a version contains 'VERSION′isneeded,butget′version'" - echo "you can install the right version, and make an soft-link to '$PATH' env" - exit -1 -fi - -clang-format -style=google $@ diff --git a/.cpplint_pre_commit.hook b/.cpplint_pre_commit.hook deleted file mode 100644 index 447b1ba13..000000000 --- a/.cpplint_pre_commit.hook +++ /dev/null @@ -1,60 +0,0 @@ -#!/bin/bash - -#TOTAL_ERRORS=0 -#echo "HAHAHAHAHHA" -#exit 5 -# -#files=$( -# -#if [[ ! $TRAVIS_BRANCH ]]; then -# # install cpplint on local machine. -# if [[ ! $(which cpplint) ]]; then -# pip install cpplint -# fi -# # diff files on local machine. -# files=$(git diff --cached --name-status | awk 'Extra open brace or missing close brace2}') -#else -# # diff files between PR and latest commit on Travis CI. -# branch_ref=(gitrev−parse"TRAVIS_BRANCH") -# head_ref=$(git rev-parse HEAD) -# files=(gitdiff−−name−statusbranch_ref $head_ref | awk 'Extra open brace or missing close brace2}') -#fi -## The trick to remove deleted files: https://stackoverflow.com/a/2413151 -#for file in $files; do -# echo $file -# if [[ $file =~ ^(patches/.*) ]]; then -# continue; -# else -# cpplint --filter=-readability/fn_size $file; -# TOTAL_ERRORS=(exprTOTAL_ERRORS + $?); -# fi -#done -# -#exit $TOTAL_ERRORS - -if git rev-parse --verify HEAD >/dev/null 2>&1 -then - against=HEAD -else - # Initial commit: diff against an empty tree object - against=4b825dc642cb6eb9a060e54bf8d69288fbee4904 -fi - -# Redirect output to stderr. -exec 1>&2 - -cpplint=cpplint -sum=0 -filters='-build/include_order,-build/namespaces,-legal/copyright,-runtime/references,-build/include_what_you_use' - -# for cpp -for file in $(git diff-index --name-status $against -- | grep -E '\.[ch](pp)?$' | awk '{print $2}'); do - $cpplint --filter=$filters $file - sum=$(expr ${sum} + $?) -done - -if [ ${sum} -eq 0 ]; then - exit 0 -else - exit 1 -fi diff --git a/.gitignore b/.gitignore index 56e85e18b..35c771cf5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,69 +1,160 @@ -build -cmake-build-debug -cmake-build-release -.vscode -FastDeploy.cmake -build-debug.sh -*dist -fastdeploy.egg-info -fastdeploy_python.egg-info -fastdeploy_gpu_python.egg-info -.setuptools-cmake-build -fastdeploy/version.py -fastdeploy/core/config.h -python/fastdeploy/c_lib_wrap.py -python/fastdeploy/LICENSE* -python/build_cpu.sh -python/fastdeploy/ThirdPartyNotices* -*.so* -fpython/astdeploy/libs/third_libs -fastdeploy/core/config.h -fastdeploy/pybind/main.cc -python/fastdeploy/libs/lib* -python/fastdeploy/libs/third_libs -__pycache__ -build_fd_android.sh -python/scripts/process_libraries.py -.vs -.idea -.DS_Store -miniprogram_npm -node_modules -.DS_Store -dist -etc -lib -dist-ssr -coverage -*.local -yalc.* -.yalc -examples/vision/collect_quantize_cc.sh -examples/vision/tests_quantize -fastdeploy/LICENSE -fastdeploy/ThirdPartyNotices.txt -FastDeployCSharp.cmake -python/fastdeploy/code_version.py -*.pdmodel -*.pdiparams -*.pdiparams.info -log.txt -serving/build -serving/build.encrypt -serving/build.encrypt.auth -output -res -tmp +# Virtualenv +/.venv/ +/venv/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +/bin/ +/build/ +/develop-eggs/ +dist/ +/eggs/ +/lib/ +/lib64/ +/output/ +/parts/ +/sdist/ +/var/ +*.egg-info/ +.installed.cfg +*.egg +.eggs + +# AUTHORS and ChangeLog will be generated while packaging +/AUTHORS +/ChangeLog + +# BCloud / BuildSubmitter +/build_submitter.* +/logger_client_log + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +.tox/ +.coverage +.cache +.pytest_cache +nosetests.xml +coverage.xml + +# Translations +*.mo +*.pot +*.doctree + +# Sphinx documentation +/docs/_build/ + +.env log -nohup.out -llm/server/__pycache__ -llm/server/data/__pycache__ -llm/server/engine/__pycache__ -llm/server/http_server/__pycache__ -llm/server/log/ -llm/client/build/ -llm/client/dist/ -llm/client/fastdeploy_client.egg-info/ -llm/client/fastdeploy_client/tests/log/ -*.pyc +.vscode +.idea + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Django stuff: *.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pycharm +.DS_Store +.idea/ +FETCH_HEAD + +#log +log/ + +checkpoints/ +checkpoints_origin/ +result/ +result_lora/ + +# npu kernel cache +kernel_meta* + +# building custom ops cache and auto-generated codes +*.o +fastdeploy_ops.py +version.txt +EGG-INFO/ + +# fp8 generated codes +autogen/ +fp8_fp8_gemm_scale_bias_act.cu +fp8_fp8_dual_gemm_scale_bias_act.cu +visitor_fp8_gemm_fused.cu + +# third party +custom_ops/third_party + +fastdeploy/model_executor/ops/base +fastdeploy/model_executor/ops/gpu/deep_gemm + +gemm_profiles.json +nohup.out + +#fp8_deep_gemm +custom_ops/gpu_ops/fp8_deep_gemm/deep_gemm/include/cutlass +custom_ops/gpu_ops/fp8_deep_gemm/deep_gemm/include/cute + +# buff +custom_ops/tmp* diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6da986415..4b08b23db 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,38 +1,55 @@ +default_install_hook_types: + - pre-commit + - commit-msg +default_stages: + - pre-commit # Run locally +# - manual # Run in CI repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: ed714747d7acbc5790b171702bb012af3b0fe145 - hooks: - - id: check-merge-conflict - - id: check-symlinks - - id: end-of-file-fixer - - id: trailing-whitespace - - id: detect-private-key - - id: check-symlinks - - id: check-added-large-files - -- repo: local - hooks: - - id: copyright_checker - name: copyright_checker - entry: python ./.copyright.hook - language: system - files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|py)$ - exclude: (?!.*third_party)^.*$ - -- repo: local - hooks: - - id: clang-format-with-version-check - name: clang-format - description: Format files with ClangFormat. - entry: bash .clang_format.hook -i - language: system - files: \.(c|cc|cxx|cpp|cu|hxx|proto)$ - -- repo: local - hooks: - - id: cpplint-cpp-source - name: cpplint - description: Check C++ code style using cpplint.py. - entry: bash .cpplint_pre_commit.hook - language: system - files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx)$ +# 格式化 +- repo: https://github.com/google/yapf + rev: v0.43.0 + hooks: + - id: yapf + args: [--in-place, --verbose] +# 代码检查 +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.11.7 + hooks: + - id: ruff + args: [--output-format, github, --fix] +# # 拼写检查 +# - repo: https://github.com/codespell-project/codespell +# rev: v2.4.1 +# hooks: +# - id: codespell +# additional_dependencies: ['tomli'] +# args: ['--toml', 'pyproject.toml'] +# 自动排序 +- repo: https://github.com/PyCQA/isort + rev: 6.0.1 + hooks: + - id: isort +# 格式化 +- repo: https://github.com/pre-commit/mirrors-clang-format + rev: v20.1.3 + hooks: + - id: clang-format + # exclude: '.*' + types_or: [c++, cuda] + args: [--style=file, --verbose] +# markdown +- repo: https://github.com/jackdewinter/pymarkdown + rev: v0.9.29 + hooks: + - id: pymarkdown + args: [fix] +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: check-merge-conflict + - id: check-symlinks + - id: end-of-file-fixer + - id: trailing-whitespace + - id: detect-private-key + - id: check-symlinks + - id: check-added-large-files diff --git a/README.md b/README.md index 35a8133cc..7fa835974 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,16 @@ # FastDeploy 2.0: 大模型推理部署 +

+ + + + + + + + +

+ FastDeploy升级2.0版本支持多种大模型推理(当前仅支持Qwen2,更多模型即将更新支持),其推理部署功能涵盖: - 一行命令即可快速实现模型的服务化部署,并支持流式生成 @@ -9,6 +20,8 @@ FastDeploy升级2.0版本支持多种大模型推理(当前仅支持Qwen2, - 提供 Weight only int8/int4 无损压缩方案 - 支持 Prometheus Metrics 指标 +> 注意: 老版本FastDeploy对于小模型的支持,请checkout [release/1.1.0分支](https://github.com/PaddlePaddle/FastDeploy/tree/release/1.1.0)。 + ## 环境依赖 - A800/H800/H100 - Python>=3.10 @@ -18,10 +31,9 @@ FastDeploy升级2.0版本支持多种大模型推理(当前仅支持Qwen2, ## 安装 -推荐使用Docker环境 +推荐使用Docker安装 ``` -docker pull -iregistry.baidu-int.com/paddlecloud/base-images:paddlecloud-ubuntu24.04-gcc12.3-cuda12.8-cudnn9.7-openmpi4.1.5-bccl2.15.5.4-ofed24.10-hadoop2.2.4.2-afsshell1.9.3.4095-250227 +docker pull iregistry.baidu-int.com/paddlepaddle/fastdeploy:2.0.0-alpha ``` ### 源码安装 @@ -33,9 +45,7 @@ python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/ 2. 安装FastDeploy ``` -# git clone FastDeploy仓库 cd FastDeploy -# 一键编译+安装本机可用的sm架构,whl包产物在dist/ bash build.sh ``` diff --git a/fastdeploy/output/__init__.py b/fastdeploy/output/__init__.py new file mode 100644 index 000000000..c40559bc8 --- /dev/null +++ b/fastdeploy/output/__init__.py @@ -0,0 +1,15 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" \ No newline at end of file diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py new file mode 100644 index 000000000..f115f1345 --- /dev/null +++ b/fastdeploy/output/token_processor.py @@ -0,0 +1,306 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +import os +import threading +import time +import traceback +from collections import Counter + +from paddlenlp.utils.env import MAX_BSZ, MAX_DRAFT_TOKENS, SPECULATE_MAX_BSZ + +from fastdeploy.engine.request import (CompletionOutput, RequestMetrics, + RequestOutput) +from fastdeploy.metrics.metrics import main_process_metrics +from fastdeploy.utils import llm_logger + + +class TokenProcessor(object): + """ + get Token/Score from Paddle inference engine + """ + + def __init__(self, cfg, cached_generated_tokens): + import paddle + + paddle.device.set_device("cpu") + self.cfg = cfg + self.cached_generated_tokens = cached_generated_tokens + self.resource_manager = None + + self.tokens_counter = Counter() + + self.is_speculate_decoding = False + if self.is_speculate_decoding: + self.output_tokens = paddle.full(shape=[ + SPECULATE_MAX_BSZ * MAX_DRAFT_TOKENS + SPECULATE_MAX_BSZ + 2, 1 + ], + fill_value=2, + dtype="int64") + else: + self.output_tokens = paddle.full(shape=[MAX_BSZ + 2, 1], + fill_value=2, + dtype="int64") + self.worker = None + + self.statics_start_time = time.time() + self.number_of_tasks = 0 + self.number_of_input_tokens = 0 + self.number_of_output_tokens = 0 + self.total_step = 0 + + def set_resource_manager(self, resource_manager): + """ + set ResourceManager + + Args: + resource_manager (ResourceManager) + """ + assert self.resource_manager is None, "The resource manager is not None, cannot set again." + self.resource_manager = resource_manager + + def run(self): + """ + start thread to get tokens + """ + assert self.resource_manager is not None, "The resource manager is None, cannot run." + if self.worker is not None: + raise Exception("Worker is already running!") + + self.worker = threading.Thread(target=self.process_sampling_results, + args=()) + self.worker.daemon = True + self.worker.start() + + def process_sampling_results(self): + """ + read tokens from paddle inference engine and process + """ + from fastdeploy.model_executor.models import \ + inference_runner_supported_models + if self.cfg.model_config.architectures not in inference_runner_supported_models \ + and "ErnieMoEVLForCausalLM" not in self.cfg.model_config.architectures: + from paddlenlp_ops import get_output, speculate_get_output + else: + os.environ["ELLM_LOG_LEVEL"] = "3" + use_pip_eff_llm = os.getenv('USE_PIP_EFF_LLM') + if use_pip_eff_llm is None: + from fastdeploy.model_executor.ops.gpu import ( + get_output, speculate_get_output) + else: + from efficientllm.ops.gpu import (get_output, + speculate_get_output) + + while True: + try: + rank_id = 0 + is_blocking = True + if self.is_speculate_decoding: + speculate_get_output(self.output_tokens, rank_id, + is_blocking) + else: + get_output(self.output_tokens, rank_id, is_blocking) + + if self.output_tokens[0, 0] == -2: + continue + + self._process_batch_output() + except Exception as e: + llm_logger.info("while get input_data error: {0} {1}".format( + e, str(traceback.format_exc()))) + + def postprocess(self, batch_result): + """ + single post-processing function + + Args: + batch_result (list): batch results + """ + self.cached_generated_tokens.put_results(batch_result) + + def _recycle_resources(self, task_id, index, task): + """ + recycle resources + """ + self.resource_manager.stop_flags[index] = True + self.resource_manager.tasks_list[index] = None + self.resource_manager._recycle_block_tables(task.block_tables) + if task_id in self.tokens_counter: + del self.tokens_counter[task_id] + + def _process_batch_output(self): + """ + batch post-processing function + """ + tokens = self.output_tokens.numpy() + batch = self.output_tokens[1, 0] + if not self.is_speculate_decoding: + tokens = tokens[2:batch + 2] + else: + accept_num = tokens[2:batch + 2] + + batch_result = list() + for i in range(batch): + if self.resource_manager.stop_flags[i]: + continue + + if not self.is_speculate_decoding: + token_ids = [int(tokens[i, 0])] + else: + token_ids = tokens[ + 2 + SPECULATE_MAX_BSZ + i * MAX_DRAFT_TOKENS:2 + + SPECULATE_MAX_BSZ + i * MAX_DRAFT_TOKENS + + accept_num[i, 0], + 0, + ].tolist() + if any(token_id < 0 for token_id in token_ids): + continue + + task = self.resource_manager.tasks_list[i] + + if self.cfg.enable_chunked_prefill: + if task.get("prefill_token_num", None) is None: + task.set("prefill_token_num", task.token_chunk_size) + else: + task.prefill_token_num += task.token_chunk_size + if task.prompt_token_ids_len > task.prefill_token_num: + continue + + task_id = task.request_id + + self.total_step += 1 + current_time = time.time() + if self.tokens_counter[task_id] == 0: + metrics = RequestMetrics( + arrival_time=task.arrival_time, + inference_start_time=task.inference_start_time, + first_token_time=time.time() - task.inference_start_time, + time_in_queue=task.schedule_start_time - + task.preprocess_end_time, + preprocess_cost_time=task.preprocess_end_time - + task.preprocess_start_time) + + main_process_metrics.time_to_first_token.observe( + current_time - task.inference_start_time) + main_process_metrics.request_queue_time.observe( + metrics.time_in_queue) + + else: + if hasattr(task, 'last_token_time' + ) and task.last_token_time is not None: + token_gen_time = current_time - task.last_token_time + main_process_metrics.time_per_output_token.observe( + token_gen_time) + + task.last_token_time = current_time + metrics = RequestMetrics( + arrival_time=time.time(), + request_start_time=task.arrival_time, + ) + self.number_of_output_tokens += len(token_ids) + result = RequestOutput(request_id=task_id, + outputs=CompletionOutput(index=i, + token_ids=[]), + finished=False, + metrics=metrics) + if self.tokens_counter[task_id] == 0: + if task.messages is not None: + result.prompt = task.messages + result.prompt_token_ids = task.prompt_token_ids + + for token_id in token_ids: + self.tokens_counter[task_id] += 1 + result.outputs.token_ids.append(token_id) + if token_id in task.eos_token_ids: + result.finished = True + result.prompt = task.prompt + result.prompt_token_ids = task.prompt_token_ids + llm_logger.info( + f"Request: {task_id} finished, number of " + f"generated tokens: {self.tokens_counter[task_id]}.") + llm_logger.info( + f"Request: {task_id} token ratio: {self.tokens_counter[task_id] / (time.time() - task.inference_start_time)}" + ) + llm_logger.info(f"{self.resource_manager.info()}") + llm_logger.info( + f"Speculate accept ratio: {1 - self.total_step * 1.0 / self.number_of_output_tokens}" + f" total step: {self.total_step}. total_output_token_num: {self.number_of_output_tokens}" + ) + self._recycle_resources(task_id, i, task) + main_process_metrics.num_requests_running.dec(1) + main_process_metrics.request_inference_time.observe( + current_time - task.inference_start_time) + break + batch_result.append(result) + + self.postprocess(batch_result) + + +class WarmUpTokenProcessor(TokenProcessor): + """ + Warmup Processor + """ + + def __init__(self, cfg): + super().__init__(cfg) + self._is_running = True + self._is_blocking = True + + def postprocess(self, batch_result): + pass + + def process_sampling_results(self): + """ + get output from model and process it + """ + from fastdeploy.model_executor.models import \ + inference_runner_supported_models + if self.cfg.model_config.architectures not in inference_runner_supported_models \ + and "ErnieMoEVLForCausalLM" not in self.cfg.model_config.architectures: + from paddlenlp_ops import get_output, speculate_get_output + else: + os.environ["ELLM_LOG_LEVEL"] = "3" + use_pip_eff_llm = os.getenv('USE_PIP_EFF_LLM') + if use_pip_eff_llm is None: + from fastdeploy.model_executor.ops.gpu import ( + get_output, speculate_get_output) + else: + from efficientllm.ops.gpu import (get_output, + speculate_get_output) + + while self._is_running: + try: + rank_id = 0 + if self.is_speculate_decoding: + speculate_get_output(self.output_tokens, rank_id, + self._is_blocking) + else: + get_output(self.output_tokens, rank_id, self._is_blocking) + + if self.output_tokens[0, 0] == -2: + continue + self._process_batch_output() + except Exception as e: + llm_logger.info("while get input_data error: {0} {1}".format( + e, str(traceback.format_exc()))) + + def stop(self): + """ + stop warm up thread + """ + self._is_running = False + self.worker.join() + llm_logger.info("warm up thread stop") + del self.worker