mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-05 16:48:03 +08:00
[feat] add metrics for yiyan adapter (#3219)
* [feat] add metrics for yiyan adapter * [fix] fix metrics num_requests_waiting and num_requests_running * [fix] fix metrics gpu_cache_usage_perc * [refactor] change where requests_number increases * [chore] rename xxx_block_num as xxx_gpu_block_num, and update their values accordingly * [chore] delete useless code
This commit is contained in:
@@ -57,14 +57,15 @@ class ResourceManager:
|
||||
self.logger = llm_logger
|
||||
self.cfg = config.cache_config
|
||||
self.max_num_seqs = max_num_seqs
|
||||
self.stop_flags = [True] * max_num_seqs
|
||||
self.stop_flags = [True] * max_num_seqs # flag set to true if the slot has not been taken
|
||||
self.enable_prefix_cache = config.cache_config.enable_prefix_caching
|
||||
self.cache_manager = PrefixCacheManager(config, tensor_parallel_size, splitwise_role, local_data_parallel_id)
|
||||
self.tasks_list = [None] * max_num_seqs
|
||||
self.tasks_list = [None] * max_num_seqs # task slots
|
||||
self.req_dict = dict()
|
||||
# current batch status of the engine
|
||||
self.real_bsz = 0
|
||||
self.logger.info(f"{self.info()}")
|
||||
main_process_metrics.max_batch_size.set(max_num_seqs)
|
||||
|
||||
def reset_cache_config(self, cfg):
|
||||
"""
|
||||
@@ -228,30 +229,31 @@ class ResourceManager:
|
||||
Returns:
|
||||
list: processed task list
|
||||
"""
|
||||
|
||||
allocated_position = 0
|
||||
processing_task_index = 0
|
||||
llm_logger.debug(f"Allocating resources for a batch of new tasks: {tasks}")
|
||||
allocated_position = 0 # number of tasks that have been allocated, also the position in request slots
|
||||
processing_task_index = 0 # current task
|
||||
processed_tasks = list()
|
||||
while allocated_position < self.max_num_seqs:
|
||||
if processing_task_index >= len(tasks):
|
||||
while allocated_position < self.max_num_seqs: # loop until all tasks are allocated resources for
|
||||
if processing_task_index >= len(tasks): # if all taskes have been tried, don't give a second chance
|
||||
break
|
||||
|
||||
can_insert = False
|
||||
while allocated_position + 1 <= self.max_num_seqs:
|
||||
if sum(self.stop_flags[allocated_position : allocated_position + 1]) == 1:
|
||||
can_insert = True
|
||||
can_insert = True # if there is a empty slot, try to allocate resources for current task
|
||||
break
|
||||
allocated_position += 1
|
||||
if can_insert:
|
||||
if self.stop_flags[allocated_position]:
|
||||
|
||||
task = tasks[processing_task_index]
|
||||
task = tasks[processing_task_index] # retrieve current task
|
||||
|
||||
if task.get("seed") is None:
|
||||
task.set("seed", random.randint(0, 9223372036854775807))
|
||||
task.idx = allocated_position
|
||||
|
||||
if self.enable_prefix_cache:
|
||||
if self.enable_prefix_cache: # if prefix caching is enabled
|
||||
# 1. request for enough blocks for current task
|
||||
cache_prepare_time = time.time()
|
||||
common_block_ids, unique_block_ids, hit_info = self.cache_manager.request_block_ids(
|
||||
task, self.cfg.block_size, self.cfg.dec_token_num
|
||||
@@ -259,14 +261,15 @@ class ResourceManager:
|
||||
if unique_block_ids is None:
|
||||
self.logger.warning("req_id: {0} not enough blocks available".format(task["req_id"]))
|
||||
return
|
||||
|
||||
# 2. record cache hit information, and return the number of tokens already in cache
|
||||
cached_len = self._record_request_cache_info(
|
||||
task, common_block_ids, unique_block_ids, hit_info
|
||||
)
|
||||
task.cache_prepare_time = time.time() - cache_prepare_time
|
||||
|
||||
# 3. if prefill/decode disaggregation is enabled
|
||||
if task.disaggregate_info is not None:
|
||||
if task.disaggregate_info["role"] == "prefill":
|
||||
# record the slot position for current task, indexed by request id
|
||||
self.req_dict[task.request_id] = allocated_position
|
||||
task.disaggregate_info["block_tables"] = task.block_tables
|
||||
self._delete_cached_data(task, cached_len)
|
||||
@@ -274,17 +277,19 @@ class ResourceManager:
|
||||
self.req_dict[task.request_id] = allocated_position
|
||||
task.disaggregate_info["block_tables"] = task.need_block_tables
|
||||
else:
|
||||
# remove cached tokens from prompt token ids to avoid kv recomputation
|
||||
self._delete_cached_data(task, cached_len)
|
||||
|
||||
else:
|
||||
else: # if prefix caching is disabled
|
||||
# 1. directly allocate empty block from the cache, if there is any
|
||||
block_tables = self._get_block_tables(task.prompt_token_ids_len)
|
||||
if not block_tables:
|
||||
llm_logger.error(f"req_id: {task.request_id} block_tables is empty")
|
||||
continue
|
||||
continue # retry
|
||||
else:
|
||||
task.block_tables = block_tables
|
||||
task.need_block_tables = task.block_tables
|
||||
|
||||
# 2. if prefill/decode disaggregation is enabled
|
||||
if task.disaggregate_info is not None:
|
||||
task.disaggregate_info["block_tables"] = block_tables
|
||||
if task.disaggregate_info["role"] == "prefill":
|
||||
@@ -292,8 +297,8 @@ class ResourceManager:
|
||||
elif task.disaggregate_info["role"] == "decode":
|
||||
self.req_dict[task.request_id] = allocated_position
|
||||
|
||||
processed_tasks.append(task)
|
||||
self.stop_flags[allocated_position] = False
|
||||
processed_tasks.append(task) # add current task
|
||||
self.stop_flags[allocated_position] = False # mark the slot as occupied
|
||||
task.inference_start_time = time.time()
|
||||
task.inference_time_cost = -1.0
|
||||
task.tokens_all_num = 0
|
||||
@@ -307,11 +312,18 @@ class ResourceManager:
|
||||
processing_task_index += 1
|
||||
|
||||
# batch size when the statistical engine is inferring
|
||||
# determine batch size by index of the first slot that is not occupied
|
||||
for i in range(self.max_num_seqs - 1, -1, -1):
|
||||
if not self.stop_flags[i]:
|
||||
self.real_bsz = i + 1
|
||||
break
|
||||
|
||||
# record batch size here
|
||||
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
|
||||
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
|
||||
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
|
||||
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
|
||||
|
||||
self.logger.info(
|
||||
f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}"
|
||||
)
|
||||
@@ -343,6 +355,11 @@ class ResourceManager:
|
||||
task.cpu_cache_token_num = hit_info["cpu_cache_blocks"] * self.cfg.block_size
|
||||
task.cache_info = (cache_block_num, no_cache_block_num)
|
||||
|
||||
# Report the number of cached tokens to Prometheus metrics
|
||||
main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens)
|
||||
main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num)
|
||||
main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num)
|
||||
|
||||
cached_len = len(common_block_ids) * self.cfg.block_size
|
||||
task.block_tables = common_block_ids + unique_block_ids
|
||||
task.need_block_tables = unique_block_ids
|
||||
|
Reference in New Issue
Block a user