fix for pd decode not enough block (#4224)

This commit is contained in:
chenjian
2025-09-23 18:01:25 +08:00
committed by GitHub
parent 918ccdb123
commit acecd5bebe
2 changed files with 36 additions and 8 deletions

View File

@@ -30,7 +30,7 @@ import paddle
import zmq import zmq
from opentelemetry import trace from opentelemetry import trace
from fastdeploy.engine.request import Request, RequestOutput from fastdeploy.engine.request import Request, RequestOutput, RequestType
from fastdeploy.engine.resource_manager import ResourceManager from fastdeploy.engine.resource_manager import ResourceManager
from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1
from fastdeploy.inter_communicator import ( from fastdeploy.inter_communicator import (
@@ -673,6 +673,22 @@ class EngineService:
tasks = self.resource_manager.schedule() tasks = self.resource_manager.schedule()
# 3. Send to engine # 3. Send to engine
if tasks: if tasks:
if self.cfg.scheduler_config.splitwise_role == "decode":
for task in tasks:
if task.task_type == RequestType.PREEMPTED:
msg = f"{task.request_id} decode not enough blocks, need to be rescheduled."
self.llm_logger.error(msg)
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
self.resource_manager.get_real_bsz() self.resource_manager.get_real_bsz()
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz)) self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
else: else:

View File

@@ -136,13 +136,23 @@ class ResourceManagerV1(ResourceManager):
preempted_req = self.running.pop() preempted_req = self.running.pop()
preempted_req.status = RequestStatus.PREEMPTED preempted_req.status = RequestStatus.PREEMPTED
preempted_req.num_computed_tokens = 0 preempted_req.num_computed_tokens = 0
if self.config.scheduler_config.splitwise_role == "decode":
self.tasks_list[preempted_req.idx] = None
self.stop_flags[preempted_req.idx] = True
if preempted_req.request_id in self.requests:
del self.requests[preempted_req.request_id]
if preempted_req.request_id in self.req_dict:
del self.req_dict[preempted_req.request_id]
self._free_blocks(preempted_req)
main_process_metrics.num_requests_running.dec(1)
else:
self._free_blocks(preempted_req) self._free_blocks(preempted_req)
preempted_req.cached_block_num = 0 preempted_req.cached_block_num = 0
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
main_process_metrics.num_requests_waiting.inc(1) main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.num_requests_running.dec(1) main_process_metrics.num_requests_running.dec(1)
preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
if preempted_req == request: if preempted_req == request:
# No more request to preempt. # No more request to preempt.
can_schedule = False can_schedule = False
@@ -583,7 +593,9 @@ class ResourceManagerV1(ResourceManager):
with self.lock: with self.lock:
self.tasks_list[request.idx] = None self.tasks_list[request.idx] = None
self.stop_flags[request.idx] = True self.stop_flags[request.idx] = True
if request.request_id in self.requests:
del self.requests[request.request_id] del self.requests[request.request_id]
if request.request_id in self.req_dict:
del self.req_dict[request.request_id] del self.req_dict[request.request_id]
self._free_blocks(request) self._free_blocks(request)