diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index 3b83306de..aad0a624d 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -231,74 +231,70 @@ class ResourceManager: break can_insert = False - while allocated_position + 1 <= self.max_num_seqs: + while allocated_position < self.max_num_seqs: if sum(self.stop_flags[allocated_position : allocated_position + 1]) == 1: can_insert = True break allocated_position += 1 if can_insert: - if self.stop_flags[allocated_position]: + task = tasks[processing_task_index] - task = tasks[processing_task_index] + if task.get("seed") is None: + task.set("seed", random.randint(0, 9223372036854775807)) + task.idx = allocated_position - if task.get("seed") is None: - task.set("seed", random.randint(0, 9223372036854775807)) - task.idx = allocated_position - - if self.enable_prefix_cache: - cache_prepare_time = time.time() - common_block_ids, unique_block_ids, hit_info = self.cache_manager.request_block_ids( - task, - self.cfg.block_size, - self.cfg.dec_token_num, - ) - if unique_block_ids is None: - llm_logger.warning("req_id: {0} not enough blocks available".format(task["req_id"])) - return - - cached_len = self._record_request_cache_info( - task, common_block_ids, unique_block_ids, hit_info - ) - task.cache_prepare_time = time.time() - cache_prepare_time - - if task.disaggregate_info is not None: - if task.disaggregate_info["role"] == "prefill": - self.req_dict[task.request_id] = allocated_position - task.disaggregate_info["block_tables"] = task.block_tables - self._delete_cached_data(task, cached_len) - elif task.disaggregate_info["role"] == "decode": - self.req_dict[task.request_id] = allocated_position - task.disaggregate_info["block_tables"] = task.need_block_tables - else: - self._delete_cached_data(task, cached_len) - - else: - block_tables = self._get_block_tables(task.prompt_token_ids_len) - if not block_tables: - llm_logger.error(f"req_id: {task.request_id} block_tables is empty") - continue - else: - task.block_tables = block_tables - task.need_block_tables = task.block_tables - - if task.disaggregate_info is not None: - task.disaggregate_info["block_tables"] = block_tables - if task.disaggregate_info["role"] == "prefill": - self.req_dict[task.request_id] = allocated_position - elif task.disaggregate_info["role"] == "decode": - self.req_dict[task.request_id] = allocated_position - - processed_tasks.append(task) - self.stop_flags[allocated_position] = False - task.inference_start_time = time.time() - task.inference_time_cost = -1.0 - task.tokens_all_num = 0 - self.tasks_list[allocated_position] = task - llm_logger.info( - f"Allocate request: {task.request_id}, " - f"allocated_position:{allocated_position}, " - f"length of prompt token: {task.prompt_token_ids_len}" + if self.enable_prefix_cache: + cache_prepare_time = time.time() + common_block_ids, unique_block_ids, hit_info = self.cache_manager.request_block_ids( + task, + self.cfg.block_size, + self.cfg.dec_token_num, ) + if unique_block_ids is None: + llm_logger.warning("req_id: {0} not enough blocks available".format(task["req_id"])) + return + + cached_len = self._record_request_cache_info(task, common_block_ids, unique_block_ids, hit_info) + task.cache_prepare_time = time.time() - cache_prepare_time + + if task.disaggregate_info is not None: + if task.disaggregate_info["role"] == "prefill": + self.req_dict[task.request_id] = allocated_position + task.disaggregate_info["block_tables"] = task.block_tables + self._delete_cached_data(task, cached_len) + elif task.disaggregate_info["role"] == "decode": + self.req_dict[task.request_id] = allocated_position + task.disaggregate_info["block_tables"] = task.need_block_tables + else: + self._delete_cached_data(task, cached_len) + + else: + block_tables = self._get_block_tables(task.prompt_token_ids_len) + if not block_tables: + llm_logger.error(f"req_id: {task.request_id} block_tables is empty") + continue + else: + task.block_tables = block_tables + task.need_block_tables = task.block_tables + + if task.disaggregate_info is not None: + task.disaggregate_info["block_tables"] = block_tables + if task.disaggregate_info["role"] == "prefill": + self.req_dict[task.request_id] = allocated_position + elif task.disaggregate_info["role"] == "decode": + self.req_dict[task.request_id] = allocated_position + + processed_tasks.append(task) + self.stop_flags[allocated_position] = False + task.inference_start_time = time.time() + task.inference_time_cost = -1.0 + task.tokens_all_num = 0 + self.tasks_list[allocated_position] = task + llm_logger.info( + f"Allocate request: {task.request_id}, " + f"allocated_position:{allocated_position}, " + f"length of prompt token: {task.prompt_token_ids_len}" + ) allocated_position += 1 processing_task_index += 1