[Cherry-pick] FIx bug for scheduler V1 (#3167)

* [BUG FIX] Fix bug when preempted request rescheduled (#3080)

* Fix bug when preempted request rescheduled

* Fix bug when preempted request rescheduled

* Fix bug when preempted request rescheduled

* Fix bug for offline inference in scheduler v1 (#3117)
This commit is contained in:
chenjian
2025-08-04 17:08:12 +08:00
committed by GitHub
parent a592d17615
commit 5439fb6336
3 changed files with 89 additions and 7 deletions

View File

@@ -499,6 +499,7 @@ class LLMEngine:
enable_thinking = kwargs.get("enable_thinking", None)
request = self.data_processor.process_request(request, self.cfg.max_model_len, enable_thinking=enable_thinking)
request.prompt_token_ids_len = len(request.prompt_token_ids)
request.need_prefill_tokens = request.prompt_token_ids_len
input_ids_len = request.prompt_token_ids_len
request.set(
"max_tokens",

View File

@@ -117,6 +117,7 @@ class Request:
self.status = RequestStatus.WAITING
self.task_type = RequestType.PREFILL
self.idx = None
self.need_prefill_tokens = self.prompt_token_ids_len
@classmethod
def from_dict(cls, d: dict):

View File

@@ -98,6 +98,87 @@ class ResourceManagerV1(ResourceManager):
break
return can_schedule
def _get_num_new_tokens(self, request, token_budget):
num_new_tokens = request.need_prefill_tokens - request.num_computed_tokens
num_new_tokens = min(num_new_tokens, token_budget)
if not self.config.enable_mm:
return num_new_tokens
inputs = request.multimodal_inputs
request.with_image = False
# Compatible with scenarios without images and videos.
if inputs["images"] is None:
return num_new_tokens
input_ids_lst = request.prompt_token_ids + request.output_token_ids
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
grid_thw = []
for one in inputs["grid_thw"]:
if one[0] == 1:
grid_thw.append(one)
else:
grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2))
image_patch_id = inputs["image_patch_id"]
grid_thw = paddle.to_tensor(grid_thw, dtype="int64")
if request.multimodal_img_boundaries is None:
from fastdeploy.model_executor.ops.gpu import get_img_boundaries
request.multimodal_img_boundaries = get_img_boundaries(
task_input_ids=input_ids, grid_thw=grid_thw, image_patch_id=image_patch_id
).numpy()
img_boundaries_idx = request.multimodal_img_boundaries[0]
img_num_per_boundary = request.multimodal_img_boundaries[1]
ori_prompt_len = img_boundaries_idx[-1].item()
grid_thw = grid_thw.numpy().reshape([-1, 3])
pre_end_idx = request.num_computed_tokens
new_end_idx = pre_end_idx + num_new_tokens
if new_end_idx < ori_prompt_len and input_ids[new_end_idx - 1] == image_patch_id:
boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item()
if boundary_idx == len(img_boundaries_idx):
new_end_idx = ori_prompt_len
else:
new_end_idx = img_boundaries_idx[boundary_idx].item()
elif new_end_idx >= ori_prompt_len and paddle.sum(input_ids[pre_end_idx:new_end_idx] == image_patch_id):
new_end_idx = ori_prompt_len
num_new_tokens = new_end_idx - pre_end_idx
image_mask = input_ids[pre_end_idx:new_end_idx] == image_patch_id
request.with_image = image_mask.any()
if request.with_image:
pre_boundary_idx = np.searchsorted(img_boundaries_idx, pre_end_idx, side="left").item()
if pre_boundary_idx == len(img_boundaries_idx):
request.num_image_start = img_num_per_boundary[-1]
else:
pre_boundary_idx = (
pre_boundary_idx if pre_end_idx == img_boundaries_idx[pre_boundary_idx] else pre_boundary_idx - 1
)
request.num_image_start = img_num_per_boundary[pre_boundary_idx]
new_boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item()
if new_boundary_idx == len(img_boundaries_idx):
request.num_image_end = img_num_per_boundary[-1]
else:
new_boundary_idx = (
new_boundary_idx if new_end_idx == img_boundaries_idx[new_boundary_idx] else new_boundary_idx - 1
)
request.num_image_end = img_num_per_boundary[new_boundary_idx]
request.num_image_end = img_num_per_boundary[new_boundary_idx]
request.image_type_ids_start = np.sum(grid_thw[: request.num_image_start, 0])
request.image_type_ids_end = np.sum(grid_thw[: request.num_image_end, 0])
request.image_start = np.sum(np.prod(grid_thw[: request.num_image_start], axis=1))
request.image_end = np.sum(np.prod(grid_thw[: request.num_image_end], axis=1))
return num_new_tokens
def exist_prefill(self, scheduled_reqs):
for request in scheduled_reqs:
if request.task_type == RequestType.PREFILL:
return True
return False
def schedule(self):
with self.lock:
scheduled_reqs: list[Request] = []
@@ -109,8 +190,8 @@ class ResourceManagerV1(ResourceManager):
num_decoding_req_nums = 0
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
if request.num_computed_tokens >= request.prompt_token_ids_len: # to be decoding
if request.num_total_tokens > request.prompt_token_ids_len: # has generated tokens
if request.num_computed_tokens >= request.need_prefill_tokens: # to be decoding
if request.num_total_tokens > request.need_prefill_tokens: # has generated tokens
request.num_computed_tokens = request.num_total_tokens - 1
if (
self.allocated_slots(request) - request.num_total_tokens
@@ -143,7 +224,7 @@ class ResourceManagerV1(ResourceManager):
token_budget -= 1
else: # need to prefill
llm_logger.debug(
f"scheduler prefill task: {request} request.prompt_token_ids_len {request.prompt_token_ids_len} request.num_computed_tokens {request.num_computed_tokens}"
f"scheduler prefill task: {request} request.need_prefill_tokens {request.need_prefill_tokens} request.num_computed_tokens {request.num_computed_tokens}"
)
num_new_tokens = request.prompt_token_ids_len - request.num_computed_tokens
num_new_tokens = min(num_new_tokens, token_budget)
@@ -170,8 +251,7 @@ class ResourceManagerV1(ResourceManager):
break
request = self.waiting[0]
if request.status == RequestStatus.WAITING:
num_new_tokens = request.num_total_tokens - request.num_computed_tokens
num_new_tokens = min(num_new_tokens, token_budget)
num_new_tokens = self._get_num_new_tokens(request, token_budget)
num_new_block = self.get_new_block_nums(request, num_new_tokens)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):
@@ -192,8 +272,8 @@ class ResourceManagerV1(ResourceManager):
else:
break
elif request.status == RequestStatus.PREEMPTED:
num_new_tokens = request.num_total_tokens - request.num_computed_tokens
num_new_tokens = min(num_new_tokens, token_budget)
request.need_prefill_tokens = request.num_total_tokens # Before preempted task rescheduled, preempted task has been sent to engine, no more tokens are output, here num_total_tokens should be static and correct
num_new_tokens = self._get_num_new_tokens(request, token_budget)
num_new_block = self.get_new_block_nums(request, num_new_tokens)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):