mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-06 00:57:33 +08:00
[optimize] Optimize prefix caching in v1 release/2.1 (#3823)
* [optimize] Optimize prefix caching in v1 * [optimize] Optimize prefix caching in v1 --------- Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
@@ -256,7 +256,11 @@ class PrefixCacheManager:
|
||||
Check if num_blocks gpu blocks can be allocated.
|
||||
"""
|
||||
if len(self.gpu_free_block_list) < num_blocks:
|
||||
return False
|
||||
self.free_block_ids(num_blocks)
|
||||
if len(self.gpu_free_block_list) < num_blocks:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return True
|
||||
|
||||
@@ -447,7 +451,7 @@ class PrefixCacheManager:
|
||||
"""
|
||||
return (input_token_num + block_size - 1) // block_size
|
||||
|
||||
def update_cache_blocks(self, task, block_size):
|
||||
def update_cache_blocks(self, task, block_size, num_computed_tokens):
|
||||
"""
|
||||
update cache blocks for a task.
|
||||
# TODO(chengyanfu): support async update
|
||||
@@ -458,12 +462,15 @@ class PrefixCacheManager:
|
||||
"""
|
||||
try:
|
||||
req_id = task.request_id
|
||||
num_cached_tokens = task.num_cached_tokens
|
||||
block_tables = task.block_tables
|
||||
|
||||
last_node, input_ids = self.cache_info[req_id]
|
||||
left_input_ids = input_ids[num_cached_tokens:]
|
||||
last_node, num_cached_tokens = self.cache_info[req_id]
|
||||
input_ids = task.prompt_token_ids + task.output_token_ids
|
||||
can_cache_computed_tokens = num_computed_tokens - num_computed_tokens % block_size
|
||||
left_input_ids = input_ids[num_cached_tokens:can_cache_computed_tokens]
|
||||
gpu_extra_block_ids = block_tables[num_cached_tokens // block_size :]
|
||||
if req_id in self.leaf_req_map[last_node]: # delete old leaf record, update later
|
||||
self.leaf_req_map[last_node].remove(req_id)
|
||||
|
||||
with self.request_release_lock:
|
||||
current_time = time.time()
|
||||
@@ -479,7 +486,8 @@ class PrefixCacheManager:
|
||||
)
|
||||
self.req_leaf_map[req_id] = leaf_node
|
||||
self.leaf_req_map[leaf_node].add(req_id)
|
||||
self.cache_info[req_id] = (leaf_node, input_ids)
|
||||
self.cache_info[req_id] = (leaf_node, can_cache_computed_tokens)
|
||||
task.cached_block_num = can_cache_computed_tokens // block_size
|
||||
except Exception as e:
|
||||
logger.error(f"update_cache_blocks, error: {type(e)} {e}")
|
||||
raise e
|
||||
@@ -541,10 +549,9 @@ class PrefixCacheManager:
|
||||
cpu_recv_block_ids=[],
|
||||
)
|
||||
else:
|
||||
raise Exception("request_match_blocks: Not enough GPU memory to allocate cache for matched CPU Cache")
|
||||
|
||||
# record request cache info
|
||||
self.cache_info[req_id] = (match_block_node, input_ids)
|
||||
raise Exception(
|
||||
"request_match_blocks: Not enough GPU memory to allocate cache for matched CPU Cache"
|
||||
)
|
||||
|
||||
# 3. update metrics
|
||||
matched_token_num = gpu_match_token_num + cpu_match_token_num
|
||||
@@ -568,6 +575,9 @@ class PrefixCacheManager:
|
||||
# set leaf node temporarily, then update it in update_cache_blocks
|
||||
self.req_leaf_map[req_id] = match_block_node
|
||||
self.leaf_req_map[match_block_node].add(req_id)
|
||||
# record request cache info
|
||||
self.cache_info[req_id] = (match_block_node, matched_token_num)
|
||||
task.cached_block_num = matched_token_num // block_size
|
||||
return common_block_ids, matched_token_num, hit_info
|
||||
except Exception as e:
|
||||
logger.error(f"request_match_blocks: request_block_ids: error: {type(e)} {e}")
|
||||
@@ -726,6 +736,7 @@ class PrefixCacheManager:
|
||||
except Exception as e:
|
||||
logger.error(f"release_block_ids: error: {type(e)} {e}")
|
||||
raise e
|
||||
|
||||
def free_nodes_directly(self, node):
|
||||
"""
|
||||
Recycle nodes by a query directly.
|
||||
@@ -848,6 +859,11 @@ class PrefixCacheManager:
|
||||
"free_block_ids_async: after free, " + f"len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}"
|
||||
)
|
||||
|
||||
def free_block_ids(self, need_block_num):
|
||||
self.free_block_ids_async(need_block_num)
|
||||
while (self.gpu_free_task_future is not None) and (not self.gpu_free_task_future.done()):
|
||||
time.sleep(0.001)
|
||||
|
||||
def free_block_ids_async(self, need_block_num):
|
||||
"""
|
||||
free block ids async
|
||||
@@ -1107,15 +1123,6 @@ class PrefixCacheManager:
|
||||
node.req_id_set.add(req_id)
|
||||
node = node.parent
|
||||
|
||||
def decrease_request_share_count(self, req_id):
|
||||
"""
|
||||
Decrease node shared count
|
||||
"""
|
||||
node, input_ids = self.cache_info[req_id]
|
||||
while node != self.radix_tree_root:
|
||||
node.decrement_shared_count()
|
||||
node = node.parent
|
||||
|
||||
def build_path(
|
||||
self,
|
||||
req_id,
|
||||
|
@@ -355,9 +355,9 @@ class LLMEngine:
|
||||
self.cfg.max_prefill_batch,
|
||||
)
|
||||
|
||||
self.resource_manager.check_and_free_block_tables()
|
||||
# self.resource_manager.check_and_free_block_tables()
|
||||
tasks = self.scheduler.get_requests(
|
||||
available_blocks=self.resource_manager.available_block_num(),
|
||||
available_blocks=self.cfg.cache_config.max_block_num_per_seq,
|
||||
block_size=self.cfg.cache_config.block_size,
|
||||
reserved_output_blocks=self.cfg.cache_config.enc_dec_block_num,
|
||||
max_num_batched_tokens=self.cfg.max_model_len,
|
||||
|
@@ -83,7 +83,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
return len(request.block_tables) * self.config.cache_config.block_size
|
||||
|
||||
def get_new_block_nums(self, request: Request, num_new_tokens: int):
|
||||
self.check_and_free_block_tables()
|
||||
# self.check_and_free_block_tables()
|
||||
return (
|
||||
request.num_computed_tokens + num_new_tokens + self.config.cache_config.block_size - 1
|
||||
) // self.config.cache_config.block_size - len(request.block_tables)
|
||||
@@ -118,7 +118,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
preempted_req.status = RequestStatus.PREEMPTED
|
||||
preempted_req.num_computed_tokens = 0
|
||||
self._free_blocks(preempted_req)
|
||||
preempted_req.prefill_block_num = None
|
||||
preempted_req.cached_block_num = 0
|
||||
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
|
||||
preempted_reqs.append(preempted_req)
|
||||
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
|
||||
@@ -234,14 +234,14 @@ class ResourceManagerV1(ResourceManager):
|
||||
if request.num_computed_tokens >= request.need_prefill_tokens: # to be decoding
|
||||
if request.num_total_tokens > request.need_prefill_tokens: # has generated tokens
|
||||
request.num_computed_tokens = request.num_total_tokens - 1
|
||||
else: # prefill finished
|
||||
if (
|
||||
self.config.cache_config.enable_prefix_caching
|
||||
and request.get("prefill_block_num", None) is None
|
||||
):
|
||||
# update prefill cache blocks for prefix caching
|
||||
request.prefill_block_num = len(request.block_tables)
|
||||
self.cache_manager.update_cache_blocks(request, self.config.cache_config.block_size)
|
||||
# else: # prefill finished
|
||||
# if (
|
||||
# self.config.cache_config.enable_prefix_caching
|
||||
# and request.get("cached_block_num", None) is None
|
||||
# ):
|
||||
# # update prefill cache blocks for prefix caching
|
||||
# request.cached_block_num = len(request.block_tables)
|
||||
# self.cache_manager.update_cache_blocks(request, self.config.cache_config.block_size)
|
||||
if (
|
||||
self.allocated_slots(request) - request.num_total_tokens
|
||||
<= self.config.cache_config.prealloc_dec_block_slot_num_threshold
|
||||
@@ -291,6 +291,9 @@ class ResourceManagerV1(ResourceManager):
|
||||
scheduled_reqs.append(self._prepare_prefill_task(request, num_new_tokens))
|
||||
token_budget -= num_new_tokens
|
||||
request.num_computed_tokens += num_new_tokens
|
||||
self.cache_manager.update_cache_blocks(
|
||||
request, self.config.cache_config.block_size, request.num_computed_tokens
|
||||
)
|
||||
req_index += 1
|
||||
# schedule the WAITING requests.
|
||||
if not preempted_reqs:
|
||||
@@ -321,6 +324,9 @@ class ResourceManagerV1(ResourceManager):
|
||||
request.schedule_start_time = time.time()
|
||||
token_budget -= num_new_tokens
|
||||
request.num_computed_tokens += num_new_tokens
|
||||
self.cache_manager.update_cache_blocks(
|
||||
request, self.config.cache_config.block_size, request.num_computed_tokens
|
||||
)
|
||||
request.status = RequestStatus.RUNNING
|
||||
allocated_position = self.get_available_position()
|
||||
request.idx = allocated_position
|
||||
@@ -351,6 +357,9 @@ class ResourceManagerV1(ResourceManager):
|
||||
scheduled_reqs.append(self._prepare_prefill_task(request, num_new_tokens))
|
||||
token_budget -= num_new_tokens
|
||||
request.num_computed_tokens += num_new_tokens
|
||||
self.cache_manager.update_cache_blocks(
|
||||
request, self.config.cache_config.block_size, request.num_computed_tokens
|
||||
)
|
||||
request.status = RequestStatus.RUNNING
|
||||
else:
|
||||
if self.config.cache_config.enable_prefix_caching:
|
||||
@@ -427,16 +436,8 @@ class ResourceManagerV1(ResourceManager):
|
||||
|
||||
def _free_blocks(self, request: Request):
|
||||
if self.config.cache_config.enable_prefix_caching:
|
||||
# TODO(chengyanfu): support cache ouput blocks for prefix caching
|
||||
if request.get("prefill_block_num", None) is None:
|
||||
leaf_node = self.cache_manager.req_leaf_map[request.request_id]
|
||||
self.cache_manager.decrease_request_share_count(request.request_id)
|
||||
self.cache_manager.free_nodes_directly(leaf_node)
|
||||
self.cache_manager.recycle_gpu_blocks(request.block_tables[request.cache_info[0]:])
|
||||
|
||||
else:
|
||||
self.cache_manager.release_block_ids_async(request)
|
||||
self.cache_manager.recycle_gpu_blocks(request.block_tables[request.prefill_block_num :])
|
||||
self.cache_manager.release_block_ids(request)
|
||||
self.cache_manager.recycle_gpu_blocks(request.block_tables[request.cached_block_num :])
|
||||
else:
|
||||
self.cache_manager.recycle_gpu_blocks(request.block_tables)
|
||||
request.block_tables = []
|
||||
|
Reference in New Issue
Block a user