[Bug fix] Fix prefix cache in v1 (#3710)

* [Bug fix] Fix prefix cache in V1

* add comment
This commit is contained in:
chenjian
2025-09-01 10:14:25 +08:00
committed by GitHub
parent 10a95f8ed5
commit a424ab907f
2 changed files with 76 additions and 9 deletions

View File

@@ -509,7 +509,7 @@ class PrefixCacheManager:
self.metrics.req_count += 1
input_ids = task.prompt_token_ids
req_id = task.request_id
logger.info(f"request_block_ids: start to allocate blocks for req_id {req_id}")
logger.info(f"request_match_blocks: start to allocate blocks for req_id {req_id}")
input_token_num = len(input_ids)
common_block_ids = []
# 1. match block
@@ -541,7 +541,7 @@ class PrefixCacheManager:
cpu_recv_block_ids=[],
)
else:
raise Exception("Not enough GPU memory to allocate cache for matched CPU Cache")
raise Exception("request_match_blocks: Not enough GPU memory to allocate cache for matched CPU Cache")
# record request cache info
self.cache_info[req_id] = (match_block_node, input_ids)
@@ -563,11 +563,14 @@ class PrefixCacheManager:
if self.metrics.req_count % 10000 == 0:
self.metrics.reset_metrics()
logger.info(
f"request_block_ids: request block for req_id {req_id}: common_block_ids {common_block_ids}"
f"request_match_blocks: request block for req_id {req_id}: common_block_ids {common_block_ids}"
)
# set leaf node temporarily, then update it in update_cache_blocks
self.req_leaf_map[req_id] = match_block_node
self.leaf_req_map[match_block_node].add(req_id)
return common_block_ids, matched_token_num, hit_info
except Exception as e:
logger.error(f"request_block_ids: error: {type(e)} {e}")
logger.error(f"request_match_blocks: request_block_ids: error: {type(e)} {e}")
raise e
def request_block_ids(self, task, block_size, dec_token_num, *args):
@@ -723,6 +726,43 @@ class PrefixCacheManager:
except Exception as e:
logger.error(f"release_block_ids: error: {type(e)} {e}")
raise e
def free_nodes_directly(self, node):
"""
Recycle nodes by a query directly.
"""
with self.request_release_lock:
try:
total_gpu_free_count = 0
while True:
if node in self.gpu_lru_leaf_heap:
self.gpu_lru_leaf_heap.remove(node)
self.gpu_lru_leaf_set.remove(node)
if node.shared_count == 0 and node.is_gpu_leaf_node: # 直接回收
self._handle_free_gpu_node_without_cpu(node)
logger.info(f"free_nodes_directly: node {node}")
total_gpu_free_count += 1
cur_node = node
node = node.parent
if cur_node.hash_value in node.children:
del node.children[cur_node.hash_value]
if not node.children:
if node in self.gpu_lru_leaf_set:
continue
if (
node != self.radix_tree_root
and node.shared_count == 0
and node.is_gpu_leaf_node
and node.is_persistent is False
):
heapq.heappush(self.gpu_lru_leaf_heap, node)
self.gpu_lru_leaf_set.add(node)
else:
break
else:
break
except Exception as e:
logger.error(f"free_nodes_directly: error: {type(e)} {e}")
raise e
def _handle_free_gpu_node_without_cpu(self, node):
"""
@@ -1066,6 +1106,15 @@ class PrefixCacheManager:
node.last_used_time = current_time
node.req_id_set.add(req_id)
node = node.parent
def decrease_request_share_count(self, req_id):
"""
Decrease node shared count
"""
node, input_ids = self.cache_info[req_id]
while node != self.radix_tree_root:
node.decrement_shared_count()
node = node.parent
def build_path(
self,

View File

@@ -117,8 +117,8 @@ class ResourceManagerV1(ResourceManager):
preempted_req = self.running.pop()
preempted_req.status = RequestStatus.PREEMPTED
preempted_req.num_computed_tokens = 0
preempted_req.prefill_block_num = 0
self._free_blocks(preempted_req)
preempted_req.prefill_block_num = None
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
@@ -305,6 +305,7 @@ class ResourceManagerV1(ResourceManager):
if self.config.cache_config.enable_prefix_caching:
success = self.get_prefix_cached_blocks(request)
if not success:
self._free_blocks(request)
break
num_new_tokens = self._get_num_new_tokens(request, token_budget)
@@ -327,16 +328,24 @@ class ResourceManagerV1(ResourceManager):
self.stop_flags[allocated_position] = False
self.req_dict[request.request_id] = allocated_position
else:
if self.config.cache_config.enable_prefix_caching:
self._free_blocks(request)
break
elif request.status == RequestStatus.PREEMPTED:
request.need_prefill_tokens = (
request.num_total_tokens
) # Before preempted task rescheduled, preempted task has been sent to engine, no more tokens are output, here num_total_tokens should be static and correct
if self.config.cache_config.enable_prefix_caching:
success = self.get_prefix_cached_blocks(request)
if not success:
self._free_blocks(request)
break
num_new_tokens = self._get_num_new_tokens(request, token_budget)
num_new_block = self.get_new_block_nums(request, num_new_tokens)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):
request.block_tables.extend(self.cache_manager.allocate_gpu_blocks(num_new_block))
if not request.get("skip_allocate", False):
request.block_tables.extend(self.cache_manager.allocate_gpu_blocks(num_new_block))
self.waiting.popleft()
self.running.append(request)
scheduled_reqs.append(self._prepare_prefill_task(request, num_new_tokens))
@@ -344,6 +353,8 @@ class ResourceManagerV1(ResourceManager):
request.num_computed_tokens += num_new_tokens
request.status = RequestStatus.RUNNING
else:
if self.config.cache_config.enable_prefix_caching:
self._free_blocks(request)
break
else:
llm_logger.error("Unknown request status type")
@@ -399,7 +410,7 @@ class ResourceManagerV1(ResourceManager):
main_process_metrics.prefix_cpu_cache_token_num.inc(request.cpu_cache_token_num)
if matched_token_num == request.prompt_token_ids_len:
request.num_computed_tokens = matched_token_num - 1
request.num_computed_tokens = matched_token_num - self.config.cache_config.block_size
request.skip_allocate = True
else:
request.num_computed_tokens = matched_token_num
@@ -417,8 +428,15 @@ class ResourceManagerV1(ResourceManager):
def _free_blocks(self, request: Request):
if self.config.cache_config.enable_prefix_caching:
# TODO(chengyanfu): support cache ouput blocks for prefix caching
self.cache_manager.release_block_ids_async(request)
self.cache_manager.recycle_gpu_blocks(request.block_tables[request.prefill_block_num :])
if request.get("prefill_block_num", None) is None:
leaf_node = self.cache_manager.req_leaf_map[request.request_id]
self.cache_manager.decrease_request_share_count(request.request_id)
self.cache_manager.free_nodes_directly(leaf_node)
self.cache_manager.recycle_gpu_blocks(request.block_tables[request.cache_info[0]:])
else:
self.cache_manager.release_block_ids_async(request)
self.cache_manager.recycle_gpu_blocks(request.block_tables[request.prefill_block_num :])
else:
self.cache_manager.recycle_gpu_blocks(request.block_tables)
request.block_tables = []