fix cache messager bug when d restart (#3386)

This commit is contained in:
chenjian
2025-08-14 11:43:59 +08:00
committed by GitHub
parent 89177d881c
commit 3f86ae0007
2 changed files with 1 additions and 7 deletions

View File

@@ -61,18 +61,12 @@ class RDMACommManager:
Connect to remote gpu and write cache. Connect to remote gpu and write cache.
""" """
assert self.splitwise_role == "prefill", "only prefill can call this method" assert self.splitwise_role == "prefill", "only prefill can call this method"
addr = f"{ip}:{port!s}"
if addr in self.connected_rdma:
return True
ret = self.messager.is_connected(ip, str(port)) ret = self.messager.is_connected(ip, str(port))
if ret: if ret:
self.connected_rdma.add(addr)
return True return True
ret = self.messager.connect(ip, str(port)) ret = self.messager.connect(ip, str(port))
logger.info(f"connect to remote rdma address {ip}:{port} status is {ret}") logger.info(f"connect to remote rdma address {ip}:{port} status is {ret}")
if ret == 0:
self.connected_rdma.add(addr)
return ret == 0 return ret == 0
def write_cache(self, ip, port, local_block_ids, remote_block_ids, layer_idx): def write_cache(self, ip, port, local_block_ids, remote_block_ids, layer_idx):

View File

@@ -265,13 +265,13 @@ class TokenProcessor:
llm_logger.info(f"finished_task_id: {finished_task_id}") llm_logger.info(f"finished_task_id: {finished_task_id}")
self.prefill_result_status[finished_task_id[0]] = finished_task_id[1] self.prefill_result_status[finished_task_id[0]] = finished_task_id[1]
if task_id in self.prefill_result_status: if task_id in self.prefill_result_status:
self.split_connector.send_first_token(task.disaggregate_info, [result])
self.resource_manager.stop_flags[index] = True self.resource_manager.stop_flags[index] = True
self.resource_manager.tasks_list[index] = None self.resource_manager.tasks_list[index] = None
self.resource_manager._recycle_block_tables(task) self.resource_manager._recycle_block_tables(task)
if self.prefill_result_status[task_id] != "finished": if self.prefill_result_status[task_id] != "finished":
result.error_code = 400 result.error_code = 400
result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}" result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}"
self.split_connector.send_first_token(task.disaggregate_info, [result])
del self.resource_manager.req_dict[task_id] del self.resource_manager.req_dict[task_id]
break break
else: else: