diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index c2dcdd216..318869c8d 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -499,6 +499,7 @@ class LLMEngine: enable_thinking = kwargs.get("enable_thinking", None) request = self.data_processor.process_request(request, self.cfg.max_model_len, enable_thinking=enable_thinking) request.prompt_token_ids_len = len(request.prompt_token_ids) + request.need_prefill_tokens = request.prompt_token_ids_len input_ids_len = request.prompt_token_ids_len request.set( "max_tokens", diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index db183bb27..7e69f4090 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -117,6 +117,7 @@ class Request: self.status = RequestStatus.WAITING self.task_type = RequestType.PREFILL self.idx = None + self.need_prefill_tokens = self.prompt_token_ids_len @classmethod def from_dict(cls, d: dict): diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 4b99f35a9..73a180f77 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -98,6 +98,87 @@ class ResourceManagerV1(ResourceManager): break return can_schedule + def _get_num_new_tokens(self, request, token_budget): + num_new_tokens = request.need_prefill_tokens - request.num_computed_tokens + num_new_tokens = min(num_new_tokens, token_budget) + + if not self.config.enable_mm: + return num_new_tokens + + inputs = request.multimodal_inputs + request.with_image = False + # Compatible with scenarios without images and videos. + if inputs["images"] is None: + return num_new_tokens + + input_ids_lst = request.prompt_token_ids + request.output_token_ids + input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") + grid_thw = [] + for one in inputs["grid_thw"]: + if one[0] == 1: + grid_thw.append(one) + else: + grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2)) + + image_patch_id = inputs["image_patch_id"] + grid_thw = paddle.to_tensor(grid_thw, dtype="int64") + if request.multimodal_img_boundaries is None: + from fastdeploy.model_executor.ops.gpu import get_img_boundaries + + request.multimodal_img_boundaries = get_img_boundaries( + task_input_ids=input_ids, grid_thw=grid_thw, image_patch_id=image_patch_id + ).numpy() + + img_boundaries_idx = request.multimodal_img_boundaries[0] + img_num_per_boundary = request.multimodal_img_boundaries[1] + ori_prompt_len = img_boundaries_idx[-1].item() + grid_thw = grid_thw.numpy().reshape([-1, 3]) + pre_end_idx = request.num_computed_tokens + new_end_idx = pre_end_idx + num_new_tokens + if new_end_idx < ori_prompt_len and input_ids[new_end_idx - 1] == image_patch_id: + boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item() + if boundary_idx == len(img_boundaries_idx): + new_end_idx = ori_prompt_len + else: + new_end_idx = img_boundaries_idx[boundary_idx].item() + elif new_end_idx >= ori_prompt_len and paddle.sum(input_ids[pre_end_idx:new_end_idx] == image_patch_id): + new_end_idx = ori_prompt_len + num_new_tokens = new_end_idx - pre_end_idx + + image_mask = input_ids[pre_end_idx:new_end_idx] == image_patch_id + request.with_image = image_mask.any() + if request.with_image: + pre_boundary_idx = np.searchsorted(img_boundaries_idx, pre_end_idx, side="left").item() + if pre_boundary_idx == len(img_boundaries_idx): + request.num_image_start = img_num_per_boundary[-1] + else: + pre_boundary_idx = ( + pre_boundary_idx if pre_end_idx == img_boundaries_idx[pre_boundary_idx] else pre_boundary_idx - 1 + ) + request.num_image_start = img_num_per_boundary[pre_boundary_idx] + + new_boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item() + if new_boundary_idx == len(img_boundaries_idx): + request.num_image_end = img_num_per_boundary[-1] + else: + new_boundary_idx = ( + new_boundary_idx if new_end_idx == img_boundaries_idx[new_boundary_idx] else new_boundary_idx - 1 + ) + request.num_image_end = img_num_per_boundary[new_boundary_idx] + + request.num_image_end = img_num_per_boundary[new_boundary_idx] + request.image_type_ids_start = np.sum(grid_thw[: request.num_image_start, 0]) + request.image_type_ids_end = np.sum(grid_thw[: request.num_image_end, 0]) + request.image_start = np.sum(np.prod(grid_thw[: request.num_image_start], axis=1)) + request.image_end = np.sum(np.prod(grid_thw[: request.num_image_end], axis=1)) + return num_new_tokens + + def exist_prefill(self, scheduled_reqs): + for request in scheduled_reqs: + if request.task_type == RequestType.PREFILL: + return True + return False + def schedule(self): with self.lock: scheduled_reqs: list[Request] = [] @@ -109,8 +190,8 @@ class ResourceManagerV1(ResourceManager): num_decoding_req_nums = 0 while req_index < len(self.running) and token_budget > 0: request = self.running[req_index] - if request.num_computed_tokens >= request.prompt_token_ids_len: # to be decoding - if request.num_total_tokens > request.prompt_token_ids_len: # has generated tokens + if request.num_computed_tokens >= request.need_prefill_tokens: # to be decoding + if request.num_total_tokens > request.need_prefill_tokens: # has generated tokens request.num_computed_tokens = request.num_total_tokens - 1 if ( self.allocated_slots(request) - request.num_total_tokens @@ -143,7 +224,7 @@ class ResourceManagerV1(ResourceManager): token_budget -= 1 else: # need to prefill llm_logger.debug( - f"scheduler prefill task: {request} request.prompt_token_ids_len {request.prompt_token_ids_len} request.num_computed_tokens {request.num_computed_tokens}" + f"scheduler prefill task: {request} request.need_prefill_tokens {request.need_prefill_tokens} request.num_computed_tokens {request.num_computed_tokens}" ) num_new_tokens = request.prompt_token_ids_len - request.num_computed_tokens num_new_tokens = min(num_new_tokens, token_budget) @@ -170,8 +251,7 @@ class ResourceManagerV1(ResourceManager): break request = self.waiting[0] if request.status == RequestStatus.WAITING: - num_new_tokens = request.num_total_tokens - request.num_computed_tokens - num_new_tokens = min(num_new_tokens, token_budget) + num_new_tokens = self._get_num_new_tokens(request, token_budget) num_new_block = self.get_new_block_nums(request, num_new_tokens) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(num_new_block): @@ -192,8 +272,8 @@ class ResourceManagerV1(ResourceManager): else: break elif request.status == RequestStatus.PREEMPTED: - num_new_tokens = request.num_total_tokens - request.num_computed_tokens - num_new_tokens = min(num_new_tokens, token_budget) + request.need_prefill_tokens = request.num_total_tokens # Before preempted task rescheduled, preempted task has been sent to engine, no more tokens are output, here num_total_tokens should be static and correct + num_new_tokens = self._get_num_new_tokens(request, token_budget) num_new_block = self.get_new_block_nums(request, num_new_tokens) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(num_new_block):