diff --git a/custom_ops/gpu_ops/get_img_boundaries.cc b/custom_ops/gpu_ops/get_img_boundaries.cc new file mode 100644 index 000000000..30ca6d269 --- /dev/null +++ b/custom_ops/gpu_ops/get_img_boundaries.cc @@ -0,0 +1,60 @@ +// Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/extension.h" + +std::vector GetImgBoundaries(const paddle::Tensor& task_input_ids, + const paddle::Tensor& grid_thw, + const int64_t image_patch_id) { + // All tensor in cpu + auto input_ids_ptr = task_input_ids.data(); + int64_t seq_lens_origin = task_input_ids.numel(); + auto grid_thw_ptr = grid_thw.data(); + + int token_times = 4; + int token_idx = 0; + int image_idx = 0; + std::vector img_boundaries, img_nums; + img_boundaries.emplace_back(0); + img_nums.emplace_back(0); + while (token_idx < seq_lens_origin) { + if (input_ids_ptr[token_idx] != image_patch_id) { + do { + token_idx++; + } while (token_idx < seq_lens_origin && input_ids_ptr[token_idx] != image_patch_id); + } else { + int cur_image_token_len = (grid_thw_ptr[image_idx * 3 + 1] * grid_thw_ptr[image_idx * 3 + 2]) / token_times; + image_idx++; + token_idx += cur_image_token_len; + } + img_boundaries.emplace_back(token_idx); + img_nums.emplace_back(image_idx); + } + + int64_t num_img_boundaries = static_cast(img_boundaries.size()); + auto out = paddle::full({2, num_img_boundaries}, 0, paddle::DataType::INT64, paddle::CPUPlace()); + + for (int i = 0; i < num_img_boundaries; i++) { + out.data()[i] = img_boundaries[i]; + out.data()[num_img_boundaries + i] = img_nums[i]; + } + + return {out}; +} + +PD_BUILD_OP(get_img_boundaries) + .Inputs({"task_input_ids", "grid_thw"}) + .Attrs({"image_patch_id: int64_t"}) + .Outputs({"img_boundaries"}) + .SetKernelFn(PD_KERNEL(GetImgBoundaries)); diff --git a/custom_ops/setup_ops.py b/custom_ops/setup_ops.py index 128403ad3..1cb091116 100644 --- a/custom_ops/setup_ops.py +++ b/custom_ops/setup_ops.py @@ -256,6 +256,7 @@ elif paddle.is_compiled_with_cuda(): "gpu_ops/gather_idx.cu", "gpu_ops/get_output_ep.cc", "gpu_ops/get_mm_split_fuse.cc", + "gpu_ops/get_img_boundaries.cc", "gpu_ops/token_penalty_multi_scores.cu", "gpu_ops/token_penalty_only_once.cu", "gpu_ops/stop_generation.cu", diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index db183bb27..69f8516c3 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -105,6 +105,7 @@ class Request: # Multi-modal related self.multimodal_inputs = multimodal_inputs self.multimodal_data = multimodal_data + self.multimodal_img_boundaries = None self.enable_thinking = enable_thinking self.trace_carrier = trace_carrier diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 4b99f35a9..5dc878895 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -1,3 +1,19 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + import threading import time from collections import deque @@ -6,6 +22,9 @@ from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import Union +import numpy as np +import paddle + from fastdeploy.engine.request import Request, RequestStatus, RequestType from fastdeploy.engine.resource_manager import ResourceManager from fastdeploy.utils import llm_logger @@ -98,6 +117,90 @@ class ResourceManagerV1(ResourceManager): break return can_schedule + def _get_num_new_tokens(self, request, token_budget, schedule_waiting=False): + if schedule_waiting: + num_new_tokens = request.num_total_tokens - request.num_computed_tokens + else: + num_new_tokens = request.prompt_token_ids_len - request.num_computed_tokens + num_new_tokens = min(num_new_tokens, token_budget) + + if not self.config.enable_mm: + return num_new_tokens + + inputs = request.multimodal_inputs + request.with_image = False + # Compatible with scenarios without images and videos. + if inputs["images"] is None: + return num_new_tokens + + input_ids_lst = request.prompt_token_ids + request.output_token_ids + input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") + grid_thw = [] + for one in inputs["grid_thw"]: + if one[0] == 1: + grid_thw.append(one) + else: + grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2)) + + image_patch_id = inputs["image_patch_id"] + grid_thw = paddle.to_tensor(grid_thw, dtype="int64") + if request.multimodal_img_boundaries is None: + from fastdeploy.model_executor.ops.gpu import get_img_boundaries + + request.multimodal_img_boundaries = get_img_boundaries( + task_input_ids=input_ids, grid_thw=grid_thw, image_patch_id=image_patch_id + ).numpy() + + img_boundaries_idx = request.multimodal_img_boundaries[0] + img_num_per_boundary = request.multimodal_img_boundaries[1] + ori_prompt_len = img_boundaries_idx[-1].item() + grid_thw = grid_thw.numpy().reshape([-1, 3]) + pre_end_idx = request.num_computed_tokens + new_end_idx = pre_end_idx + num_new_tokens + if new_end_idx < ori_prompt_len and input_ids[new_end_idx - 1] == image_patch_id: + boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item() + if boundary_idx == len(img_boundaries_idx): + new_end_idx = ori_prompt_len + else: + new_end_idx = img_boundaries_idx[boundary_idx].item() + elif new_end_idx >= ori_prompt_len and paddle.sum(input_ids[pre_end_idx:new_end_idx] == image_patch_id): + new_end_idx = ori_prompt_len + num_new_tokens = new_end_idx - pre_end_idx + + image_mask = input_ids[pre_end_idx:new_end_idx] == image_patch_id + request.with_image = image_mask.any() + if request.with_image: + pre_boundary_idx = np.searchsorted(img_boundaries_idx, pre_end_idx, side="left").item() + if pre_boundary_idx == len(img_boundaries_idx): + request.num_image_start = img_num_per_boundary[-1] + else: + pre_boundary_idx = ( + pre_boundary_idx if pre_end_idx == img_boundaries_idx[pre_boundary_idx] else pre_boundary_idx - 1 + ) + request.num_image_start = img_num_per_boundary[pre_boundary_idx] + + new_boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item() + if new_boundary_idx == len(img_boundaries_idx): + request.num_image_end = img_num_per_boundary[-1] + else: + new_boundary_idx = ( + new_boundary_idx if new_end_idx == img_boundaries_idx[new_boundary_idx] else new_boundary_idx - 1 + ) + request.num_image_end = img_num_per_boundary[new_boundary_idx] + + request.num_image_end = img_num_per_boundary[new_boundary_idx] + request.image_type_ids_start = np.sum(grid_thw[: request.num_image_start, 0]) + request.image_type_ids_end = np.sum(grid_thw[: request.num_image_end, 0]) + request.image_start = np.sum(np.prod(grid_thw[: request.num_image_start], axis=1)) + request.image_end = np.sum(np.prod(grid_thw[: request.num_image_end], axis=1)) + return num_new_tokens + + def exist_prefill(self, scheduled_reqs): + for request in scheduled_reqs: + if request.task_type == RequestType.PREFILL: + return True + return False + def schedule(self): with self.lock: scheduled_reqs: list[Request] = [] @@ -145,8 +248,7 @@ class ResourceManagerV1(ResourceManager): llm_logger.debug( f"scheduler prefill task: {request} request.prompt_token_ids_len {request.prompt_token_ids_len} request.num_computed_tokens {request.num_computed_tokens}" ) - num_new_tokens = request.prompt_token_ids_len - request.num_computed_tokens - num_new_tokens = min(num_new_tokens, token_budget) + num_new_tokens = self._get_num_new_tokens(request, token_budget) num_new_block = self.get_new_block_nums(request, num_new_tokens) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(num_new_block): @@ -168,10 +270,11 @@ class ResourceManagerV1(ResourceManager): while self.waiting and token_budget > 0: if len(self.running) == self.max_num_seqs: break + if self.config.enable_mm and self.exist_prefill(scheduled_reqs): + break request = self.waiting[0] if request.status == RequestStatus.WAITING: - num_new_tokens = request.num_total_tokens - request.num_computed_tokens - num_new_tokens = min(num_new_tokens, token_budget) + num_new_tokens = self._get_num_new_tokens(request, token_budget, True) num_new_block = self.get_new_block_nums(request, num_new_tokens) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(num_new_block): @@ -192,8 +295,7 @@ class ResourceManagerV1(ResourceManager): else: break elif request.status == RequestStatus.PREEMPTED: - num_new_tokens = request.num_total_tokens - request.num_computed_tokens - num_new_tokens = min(num_new_tokens, token_budget) + num_new_tokens = self._get_num_new_tokens(request, token_budget, True) num_new_block = self.get_new_block_nums(request, num_new_tokens) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(num_new_block): diff --git a/fastdeploy/input/ernie_vl_processor.py b/fastdeploy/input/ernie_vl_processor.py index 1b8669e29..a2c4dd1e5 100644 --- a/fastdeploy/input/ernie_vl_processor.py +++ b/fastdeploy/input/ernie_vl_processor.py @@ -17,6 +17,7 @@ import os import numpy as np +from paddleformers.generation import GenerationConfig from fastdeploy.engine.request import Request from fastdeploy.input.ernie_processor import ErnieProcessor @@ -62,6 +63,15 @@ class ErnieMoEVLProcessor(ErnieProcessor): if reasoning_parser_obj: self.reasoning_parser = reasoning_parser_obj(self.tokenizer) + # Generation config + try: + self.generation_config = GenerationConfig.from_pretrained(model_name_or_path) + except Exception as e: + data_processor_logger.warning( + f"Can't find generation config: {e}, so it will not use generation_config field in the model config" + ) + self.generation_config = None + def get_pad_id(self): """get pad id""" return self.tokenizer.pad_token_id @@ -75,12 +85,34 @@ class ErnieMoEVLProcessor(ErnieProcessor): """ self.tokenizer = self.ernie_processor.tokenizer + def _apply_default_parameters(self, request): + """ + Apply default value for parameters in request + """ + + def set_value(req, key, value): + value = getattr(self.generation_config, key, value) + if isinstance(req, dict): + if key not in req: + req[key] = value + else: + if req.get(key) is None: + req.set(key, value) + + set_value(request, "top_p", 0.7) + set_value(request, "temperature", 1.0) + set_value(request, "repetition_penalty", 1.0) + set_value(request, "frequency_penalty", 0.0) + set_value(request, "presence_penalty", 0.0) + return request + def process_request(self, request, max_model_len=None, **kwargs): """process the input data""" task = request.to_dict() task["enable_thinking"] = kwargs.get("enable_thinking", True) self.process_request_dict(task, max_model_len) request = Request.from_dict(task) + request = self._apply_default_parameters(request) return request @@ -162,6 +194,7 @@ class ErnieMoEVLProcessor(ErnieProcessor): def process_request_dict(self, request, max_model_len=None): """process the input data""" + request = self._apply_default_parameters(request) if not request.get("eos_token_ids"): request["eos_token_ids"] = self.eos_token_ids @@ -191,7 +224,7 @@ class ErnieMoEVLProcessor(ErnieProcessor): if metadata and metadata.get("generated_token_ids"): self.append_generated_tokens(outputs, metadata["generated_token_ids"]) outputs = self.pack_outputs(outputs) - request["prompt_token_ids"] = outputs["input_ids"] + request["prompt_token_ids"] = outputs["input_ids"].tolist() request["prompt_token_ids_len"] = len(request["prompt_token_ids"]) request["multimodal_inputs"] = outputs @@ -227,6 +260,7 @@ class ErnieMoEVLProcessor(ErnieProcessor): outs["grid_thw"] = np.vstack(outs["grid_thw"]) outs["image_type_ids"] = np.array(outs["image_type_ids"]) + outs["image_patch_id"] = self.image_patch_id # Convert lists to arrays outs["input_ids"] = np.array(outs["input_ids"], dtype=np.int64) outs["token_type_ids"] = np.array(outs["token_type_ids"], dtype=np.int64) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 7ea6b1cb8..24c85d12c 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -211,6 +211,47 @@ class GPUModelRunner(ModelRunnerBase): prefill_start_index = request.prefill_start_index prefill_end_index = request.prefill_end_index length = prefill_end_index - prefill_start_index + if self.enable_mm: + inputs = request.multimodal_inputs + if request.with_image: + vision_inputs = {} + vision_inputs["input_ids"] = paddle.to_tensor( + inputs["input_ids"][prefill_start_index:prefill_end_index], dtype=paddle.int64 + ) + vision_inputs["token_type_ids"] = paddle.to_tensor( + inputs["token_type_ids"][prefill_start_index:prefill_end_index], dtype=paddle.int64 + ) + vision_inputs["image_type_ids"] = paddle.to_tensor( + inputs["image_type_ids"][request.image_type_ids_start : request.image_type_ids_end], + dtype=paddle.int64, + ) + vision_inputs["images"] = paddle.to_tensor( + inputs["images"][request.image_start : request.image_end], dtype="uint8" + ) + vision_inputs["grid_thw"] = paddle.to_tensor( + inputs["grid_thw"][request.num_image_start : request.num_image_end], dtype="int64" + ) + self.share_inputs["image_features"] = self.extract_vision_features(vision_inputs) + else: + self.share_inputs["image_features"] = None + + if inputs["position_ids"] is not None: + position_ids = paddle.to_tensor( + request.multimodal_inputs["position_ids"], + dtype="int64", + ).unsqueeze([0]) + else: + position_ids = None + + enable_thinking = request.get("enable_thinking", True) + enable_thinking = enable_thinking if enable_thinking is not None else True + self.share_inputs["enable_thinking"][:] = enable_thinking + self.share_inputs["need_think_end"][idx : idx + 1, :] = 1 if enable_thinking else 0 + self.share_inputs["reasoning_index"][idx : idx + 1, :] = request.get("reasoning_max_tokens", 2048) + self.share_inputs["rope_emb"][idx : idx + 1, :] = self.prepare_rope3d( + position_ids, request.get("max_tokens", 2048) + ) + input_ids = request.prompt_token_ids + request.output_token_ids self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array( input_ids[prefill_start_index:prefill_end_index]