[BugFix] fix async download bug (#5349)

* fix async download bug

* update log

* Revert "update log"

This reverts commit 5816e602f4.

* update code

* fix mtp bug
This commit is contained in:
kevin
2025-12-05 18:59:12 +08:00
committed by GitHub
parent 5b900667e3
commit c9d7f9e7c3
6 changed files with 59 additions and 22 deletions

View File

@@ -722,6 +722,10 @@ class EngineService:
)
if self.cfg.scheduler_config.splitwise_role != "mixed":
if self.cfg.scheduler_config.splitwise_role == "prefill":
for task in tasks:
# start async preprocess
self.resource_manager.apply_async_preprocess(task)
need_delete_tasks = []
if envs.FD_OFFLINE_PERF_TEST_FOR_PD:
for task in tasks:
@@ -782,18 +786,39 @@ class EngineService:
need_check_req_ids = [task.request_id for task in tasks]
self.split_connector.send_cache_info_to_messager(tasks, 0)
# ensure cache tasks has sent to cache_messager
need_check_req_ids = [task.request_id for task in tasks]
finished_ids, delete_tasks_list = [], []
while need_check_req_ids:
req_ids = self.engine_worker_queue.get_finished_add_cache_task_req()
if req_ids:
self.llm_logger.debug(
f"P has successfully sent cache infos to cache messager for requests: {req_ids}"
)
for req_id in req_ids:
assert req_id in need_check_req_ids
need_check_req_ids.remove(req_id)
finished_ids.extend(self.engine_worker_queue.get_finished_add_cache_task_req())
self.llm_logger.debug(
f"P has successfully sent cache infos to cache messager for requests: {finished_ids}"
)
if finished_ids:
for task in tasks:
result = self.resource_manager.waiting_async_process(task)
if result is None:
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=task.error_code,
error_msg=task.error_message,
)
]
)
delete_tasks_list.append(task)
elif result is False:
if task.request_id in finished_ids:
need_check_req_ids.remove(task.request_id)
finished_ids.remove(task.request_id)
else:
time.sleep(0.001)
for tmp_task in delete_tasks_list:
tasks.remove(tmp_task)
# release resource in P
self.resource_manager.pre_recycle_resource(tmp_task.request_id)
# Fetch requests and add them to the scheduling queue
if tasks:
for task in tasks: