From c9b47f90ce1fad6fdeb508c746c6ca70dc097ea8 Mon Sep 17 00:00:00 2001 From: kevin Date: Tue, 16 Dec 2025 14:21:42 +0800 Subject: [PATCH] [BugFix] fix cpu prefix cache bug (#5544) * fix_dy_c8_bug * add block_num check * fix test case * update ci case --- .../cache_manager/cache_transfer_manager.py | 8 ++++++++ fastdeploy/cache_manager/prefix_cache_manager.py | 1 + fastdeploy/config.py | 16 ++++++++++------ .../cache_manager/test_cache_transfer_manager.py | 1 + tests/engine/test_common_engine.py | 5 +++-- 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/fastdeploy/cache_manager/cache_transfer_manager.py b/fastdeploy/cache_manager/cache_transfer_manager.py index b2b8218c8..f2d19393a 100644 --- a/fastdeploy/cache_manager/cache_transfer_manager.py +++ b/fastdeploy/cache_manager/cache_transfer_manager.py @@ -66,6 +66,13 @@ def parse_args(): choices=["uint8", "bfloat16", "block_wise_fp8"], help="cache dtype", ) + parser.add_argument( + "--default_dtype", + type=str, + default="bfloat16", + choices=["float16", "bfloat16", "uint8"], + help="paddle default dtype, swap_cache_batch only support float16、bfloat16 and uint8 now", + ) parser.add_argument("--key_cache_shape", type=str, default="", help="key cache shape") parser.add_argument("--value_cache_shape", type=str, default="", help="value cache shape") parser.add_argument("--cache_queue_port", type=int, default=9923, help="cache queue port") @@ -124,6 +131,7 @@ class CacheTransferManager: self.num_gpu_blocks = self.key_cache_shape[0] self.num_extra_layers = self.speculative_config.num_extra_cache_layer self.num_extra_layer_gpu_blocks = int(self.num_gpu_blocks * self.speculative_config.num_gpu_block_expand_ratio) + paddle.set_default_dtype(args.default_dtype) self.swap_to_cpu_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) self.swap_to_gpu_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index a3c610965..90a997cc3 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -279,6 +279,7 @@ class PrefixCacheManager: + f" --local_data_parallel_id {self.local_data_parallel_id}" + f" --rdma_port {cache_config.rdma_comm_ports[i] if cache_config.rdma_comm_ports is not None else '0'}" + f" --speculative_config '{self.speculative_config.to_json_string()}'" + + f" --default_dtype '{self.config.model_config.dtype}'" + (" --create_cache_tensor" if create_cache_tensor else "") + f" >{log_dir}/launch_cache_transfer_manager_tprank{i}.log 2>&1" ) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 974860e03..4b4bc0aef 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1355,9 +1355,11 @@ class CacheConfig: self.prefill_kvcache_block_num = self.total_block_num else: self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio) - assert ( - self.prefill_kvcache_block_num >= self.max_block_num_per_seq - ), f"current block number :{self.prefill_kvcache_block_num} should be greater than or equal to current model len needed minimum block number :{self.max_block_num_per_seq}" + assert self.prefill_kvcache_block_num >= self.max_block_num_per_seq + self.enc_dec_block_num, ( + f"prefill_kvcache_block_num: {self.prefill_kvcache_block_num} should be larger " + f"than or equal to {self.max_block_num_per_seq + self.enc_dec_block_num}, please reduce " + "the max_model_len or increase num_gpu_blocks_override" + ) else: length = num_total_tokens // number_of_tasks block_num = (length + self.block_size - 1 + self.dec_token_num) // self.block_size @@ -1378,9 +1380,11 @@ class CacheConfig: f"Reset block num, the total_block_num:{self.total_block_num}," f" prefill_kvcache_block_num:{self.prefill_kvcache_block_num}" ) - assert ( - self.prefill_kvcache_block_num >= self.max_block_num_per_seq - ), f"current block number :{self.prefill_kvcache_block_num} should be greater than or equal to current model len needed minimum block number :{self.max_block_num_per_seq}" + assert self.prefill_kvcache_block_num >= self.max_block_num_per_seq + self.enc_dec_block_num, ( + f"current device block num: {self.prefill_kvcache_block_num} " + f"should be larger than or equal to {self.max_block_num_per_seq + self.enc_dec_block_num}, please reduce " + "the max_model_len or replace the machine with larger GPU cards" + ) def print(self): """ diff --git a/tests/cache_manager/test_cache_transfer_manager.py b/tests/cache_manager/test_cache_transfer_manager.py index 4ae824950..8c74edcd5 100644 --- a/tests/cache_manager/test_cache_transfer_manager.py +++ b/tests/cache_manager/test_cache_transfer_manager.py @@ -40,6 +40,7 @@ class Args: value_cache_shape = "" create_cache_tensor = False cache_dtype = "bfloat16" + default_dtype = "bfloat16" # ========================== diff --git a/tests/engine/test_common_engine.py b/tests/engine/test_common_engine.py index a3ed0270e..15f4d2612 100644 --- a/tests/engine/test_common_engine.py +++ b/tests/engine/test_common_engine.py @@ -253,8 +253,9 @@ class TestCommonEngineAdditionalCoverage(unittest.TestCase): with patch("fastdeploy.engine.args_utils.envs.ENABLE_V1_KVCACHE_SCHEDULER", 0): cfg = self._make_cfg( splitwise_role="prefill", - num_gpu_blocks_override=3, + num_gpu_blocks_override=4, router="0.0.0.0:30000", + kv_cache_ratio=1, ) # Patch EngineWorkerQueue before EngineService ctor to avoid real IPC @@ -337,7 +338,7 @@ class TestCommonEngineAdditionalCoverage(unittest.TestCase): def test_start_mixed_branch_cache_after_load_and_zmq(self): """Cover lines 215-217 and 231 in start().""" - cfg = self._make_cfg(splitwise_role="mixed", num_gpu_blocks_override=2) + cfg = self._make_cfg(splitwise_role="mixed", num_gpu_blocks_override=4) class DummyQ: def __init__(self, *a, **k):