[Scheduler] update v1 prefill batch (#4611)

* update v1 prefill batch

* update code

* update code
This commit is contained in:
kevin
2025-10-31 14:03:01 +08:00
committed by GitHub
parent dde7ba3f9e
commit 64e875b460
3 changed files with 31 additions and 7 deletions

View File

@@ -328,6 +328,26 @@ class ResourceManagerV1(ResourceManager):
inputs["mm_positions"] = []
inputs["mm_hashes"] = []
def _is_mm_request(self, request):
inputs = request.multimodal_inputs
if inputs is None or len(inputs) == 0:
return False
if (
(inputs.get("video_feature_urls") is not None and len(inputs["video_feature_urls"]) > 0)
or (inputs.get("image_feature_urls") is not None and len(inputs["image_feature_urls"]) > 0)
or (inputs.get("audio_feature_urls") is not None and len(inputs["audio_feature_urls"]) > 0)
):
return True
elif (
inputs.get("images", None) is not None
and inputs.get("image_patch_id", None) is not None
and inputs.get("grid_thw", None) is not None
):
return True
return False
def _get_num_new_tokens(self, request, token_budget):
# TODO: set condition to new _get_num_new_tokens
num_new_tokens = request.need_prefill_tokens - request.num_computed_tokens
@@ -461,6 +481,12 @@ class ResourceManagerV1(ResourceManager):
# Compatible with scenarios without images and videos.
return num_new_tokens
def exist_mm_prefill(self, scheduled_reqs):
for request in scheduled_reqs:
if request.task_type == RequestType.PREFILL and self._is_mm_request(request):
return True
return False
def exist_prefill(self, scheduled_reqs):
for request in scheduled_reqs:
if request.task_type == RequestType.PREFILL:
@@ -606,12 +632,12 @@ class ResourceManagerV1(ResourceManager):
while self.waiting and token_budget > 0:
if len(self.running) == self.max_num_seqs:
break
if not self.enable_max_prefill and (
(self.config.model_config.enable_mm or paddle.is_compiled_with_xpu())
and self.exist_prefill(scheduled_reqs)
request = self.waiting[0]
if (self._is_mm_request(request) and self.exist_mm_prefill(scheduled_reqs)) or (
paddle.is_compiled_with_xpu() and self.exist_prefill(scheduled_reqs)
):
break
request = self.waiting[0]
if request.status == RequestStatus.WAITING:
self._update_mm_hashes(request)
# Enable prefix caching

View File

@@ -471,8 +471,6 @@ class GPUModelRunner(ModelRunnerBase):
else:
raise ValueError(f"multiple modalities model {self.model_config.model_type} is not supported")
self.share_inputs["image_features"] = image_features[-actual_image_token_num:]
else:
self.share_inputs["image_features"] = None
position_ids = request.multimodal_inputs["position_ids"]
rope_3d_position_ids["position_ids_idx"].append(request.idx)
@@ -495,6 +493,7 @@ class GPUModelRunner(ModelRunnerBase):
req_len = len(req_dicts)
has_prefill_task = False
has_decode_task = False
self.share_inputs["image_features"] = None
multi_vision_inputs = {"images_lst": [], "grid_thw_lst": [], "vit_position_ids_lst": [], "cu_seqlens": [0]}
rope_3d_position_ids = {
"position_ids_idx": [],

View File

@@ -283,7 +283,6 @@ class PaddleDisWorkerProc:
# The first worker detects whether there are tasks in the task queue
if local_rank == 0:
if self.task_queue.num_tasks() > 0:
# VL only support 1 batch to prefill
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
self.fd_config.model_config.enable_mm and self.worker.exist_prefill()
):