[Feature] dyc8 support prefixcache (#5125)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
Publish Job / publish_pre_check (push) Has been cancelled
Publish Job / print_publish_pre_check_outputs (push) Has been cancelled
Publish Job / FD-Clone-Linux (push) Has been cancelled
Publish Job / Show Code Archive Output (push) Has been cancelled
Publish Job / BUILD_SM8090 (push) Has been cancelled
Publish Job / BUILD_SM8689 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8090 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8689 (push) Has been cancelled
Publish Job / Run FD Image Build (push) Has been cancelled
Publish Job / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
Publish Job / Run FastDeploy LogProb Tests (push) Has been cancelled
Publish Job / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
Publish Job / Run Base Tests (push) Has been cancelled
Publish Job / Run Accuracy Tests (push) Has been cancelled
Publish Job / Run Stable Tests (push) Has been cancelled
CI Images Build / FD-Clone-Linux (push) Has been cancelled
CI Images Build / Show Code Archive Output (push) Has been cancelled
CI Images Build / CI Images Build (push) Has been cancelled
CI Images Build / BUILD_SM8090 (push) Has been cancelled
CI Images Build / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
CI Images Build / Run FastDeploy LogProb Tests (push) Has been cancelled
CI Images Build / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
CI Images Build / Run Base Tests (push) Has been cancelled
CI Images Build / Publish Docker Images Pre Check (push) Has been cancelled

* dyc8 support prefixcache

* fix cache_trans test case

* update code
This commit is contained in:
kevin
2025-11-21 19:46:26 +08:00
committed by GitHub
parent ab3a2e45ff
commit c068a4f642
5 changed files with 272 additions and 121 deletions

View File

@@ -63,7 +63,7 @@ def parse_args():
"--cache_dtype",
type=str,
default="bfloat16",
choices=["uint8", "bfloat16"],
choices=["uint8", "bfloat16", "block_wise_fp8"],
help="cache dtype",
)
parser.add_argument("--key_cache_shape", type=str, default="", help="key cache shape")
@@ -114,6 +114,8 @@ class CacheTransferManager:
self.cpu_cache_kvs = {}
self.gpu_cache_k_tensors = []
self.gpu_cache_v_tensors = []
self.gpu_cache_scales_k_tensors = []
self.gpu_cache_scales_v_tensors = []
self.speculative_config = SpeculativeConfig(args.speculative_config)
self.key_cache_shape = [int(i) for i in args.key_cache_shape.split(",")]
self.value_cache_shape = []
@@ -131,6 +133,7 @@ class CacheTransferManager:
self.rank = rank
self.device = device
self.engine_pid = args.engine_pid
self.cache_dtype = args.cache_dtype
address = (args.pod_ip, args.cache_queue_port)
self.cache_task_queue = EngineCacheQueue(
@@ -203,12 +206,19 @@ class CacheTransferManager:
time.sleep(0.1)
logger.info(f"[rank {self.rank}/{self.n_ranks}] OK! Stop waiting.")
if args.cache_dtype == "block_wise_fp8":
cache_type = "uint8"
else:
cache_type = args.cache_dtype
logger.info(f"[rank {self.rank}/{self.n_ranks}] Initializing kv cache for all layers.")
set_device(self.device)
for i in range(args.num_layers + self.num_extra_layers):
num_gpu_blocks = self.num_gpu_blocks if i < args.num_layers else self.num_extra_layer_gpu_blocks
key_name = f"key_caches_{i}_rank{self.rank}.device{self.device}"
val_name = f"value_caches_{i}_rank{self.rank}.device{self.device}"
key_cache_scales_name = f"key_cache_scales_{i}_rank{self.rank}.device{self.device}"
value_cache_scales_name = f"value_cache_scales_{i}_rank{self.rank}.device{self.device}"
key_cache_shape = [
num_gpu_blocks,
self.key_cache_shape[1],
@@ -227,26 +237,64 @@ class CacheTransferManager:
logger.info(
f"[rank {self.rank}/{self.n_ranks}] ..creating kv cache for layer {i}: {key_cache_shape} {value_cache_shape}"
)
key_cache = paddle.full(shape=key_cache_shape, fill_value=0, dtype=args.cache_dtype)
key_cache = paddle.full(shape=key_cache_shape, fill_value=0, dtype=cache_type)
set_data_ipc(key_cache, key_name)
if args.cache_dtype == "block_wise_fp8":
key_cache_scales = paddle.full(
shape=[num_gpu_blocks, self.key_cache_shape[1], self.key_cache_shape[2]],
fill_value=0,
dtype=paddle.get_default_dtype(),
)
set_data_ipc(key_cache_scales, key_cache_scales_name)
if self.value_cache_shape:
val_cache = paddle.full(shape=value_cache_shape, fill_value=0, dtype=args.cache_dtype)
val_cache = paddle.full(shape=value_cache_shape, fill_value=0, dtype=cache_type)
set_data_ipc(val_cache, val_name)
if args.cache_dtype == "block_wise_fp8":
value_cache_scales = paddle.full(
shape=[num_gpu_blocks, self.value_cache_shape[1], self.value_cache_shape[2]],
fill_value=0,
dtype=paddle.get_default_dtype(),
)
set_data_ipc(value_cache_scales, value_cache_scales_name)
else:
logger.info(
f"[rank {self.rank}/{self.n_ranks}] ..attaching kv cache for layer {i}: {key_cache_shape} {value_cache_shape}"
)
key_cache = paddle.empty(shape=[], dtype=args.cache_dtype)
val_cache = paddle.empty(shape=[], dtype=args.cache_dtype)
key_cache = paddle.empty(shape=[], dtype=cache_type)
val_cache = paddle.empty(shape=[], dtype=cache_type)
key_cache = share_external_data_(key_cache, key_name, key_cache_shape, True)
if args.cache_dtype == "block_wise_fp8":
key_cache_scales = paddle.empty(shape=[], dtype=paddle.get_default_dtype())
key_cache_scales = share_external_data_(
key_cache_scales,
key_cache_scales_name,
[num_gpu_blocks, self.key_cache_shape[1], self.key_cache_shape[2]],
True,
)
if self.value_cache_shape:
val_cache = share_external_data_(val_cache, val_name, value_cache_shape, True)
if args.cache_dtype == "block_wise_fp8":
value_cache_scales = paddle.empty(shape=[], dtype=paddle.get_default_dtype())
value_cache_scales = share_external_data_(
value_cache_scales,
value_cache_scales_name,
[num_gpu_blocks, self.value_cache_shape[1], self.value_cache_shape[2]],
True,
)
self.gpu_cache_kvs[key_name] = key_cache
self.gpu_cache_k_tensors.append(self.gpu_cache_kvs[key_name])
if args.cache_dtype == "block_wise_fp8":
self.gpu_cache_kvs[key_cache_scales_name] = key_cache_scales
self.gpu_cache_scales_k_tensors.append(self.gpu_cache_kvs[key_cache_scales_name])
if args.value_cache_shape:
self.gpu_cache_kvs[val_name] = val_cache
self.gpu_cache_v_tensors.append(self.gpu_cache_kvs[val_name])
if args.cache_dtype == "block_wise_fp8":
self.gpu_cache_kvs[value_cache_scales_name] = value_cache_scales
self.gpu_cache_scales_v_tensors.append(self.gpu_cache_kvs[value_cache_scales_name])
if args.create_cache_tensor:
logger.info(f"[rank {self.rank}/{self.n_ranks}] ✅ kv cache is ready!")
@@ -265,12 +313,17 @@ class CacheTransferManager:
value_cache_size = 0
if args.cache_dtype == "bfloat16":
cache_bytes = 2
elif args.cache_dtype == "uint8":
elif args.cache_dtype == "uint8" or args.cache_dtype == "block_wise_fp8":
cache_bytes = 1
else:
raise ValueError(f"Unsupported cache dtype: {args.cache_dtype}")
key_need_to_allocate_bytes = args.num_cpu_blocks * cache_bytes * key_cache_size
value_need_to_allocate_bytes = args.num_cpu_blocks * cache_bytes * value_cache_size
if args.cache_dtype == "block_wise_fp8":
cache_scales = paddle.empty(shape=[], dtype=paddle.get_default_dtype())
cache_scales_size = self.key_cache_shape[1] * self.key_cache_shape[2]
scales_key_need_to_allocate_bytes = args.num_cpu_blocks * cache_scales.element_size() * cache_scales_size
scales_value_need_to_allocate_bytes = args.num_cpu_blocks * cache_scales.element_size() * cache_scales_size
logger.info(
f"[rank {self.rank}/{self.n_ranks}] ..swap space size : {(key_need_to_allocate_bytes + value_need_to_allocate_bytes) / 1024 ** 3:.2f}GB"
)
@@ -282,17 +335,27 @@ class CacheTransferManager:
paddle.set_device("cpu")
self.k_dst_ptrs = []
self.v_dst_ptrs = []
self.k_scales_ptrs = []
self.v_scales_ptrs = []
for i in range(args.num_layers + self.num_extra_layers):
key_name = f"key_caches_{i}_rank{self.rank}"
val_name = f"value_caches_{i}_rank{self.rank}"
key_cache_scales_name = f"key_cache_scales_{i}_rank{self.rank}"
value_cache_scales_name = f"value_cache_scales_{i}_rank{self.rank}"
logger.info(
f"[rank {self.rank}/{self.n_ranks}] ..creating cpu cache for layer {i}: {(key_need_to_allocate_bytes + value_need_to_allocate_bytes) / 1024 ** 3:.2f}GB"
)
self.cpu_cache_kvs[key_name] = cuda_host_alloc(key_need_to_allocate_bytes)
self.k_dst_ptrs.append(self.cpu_cache_kvs[key_name])
if args.cache_dtype == "block_wise_fp8":
self.cpu_cache_kvs[key_cache_scales_name] = cuda_host_alloc(scales_key_need_to_allocate_bytes)
self.k_scales_ptrs.append(self.cpu_cache_kvs[key_cache_scales_name])
if value_need_to_allocate_bytes > 0:
self.cpu_cache_kvs[val_name] = cuda_host_alloc(value_need_to_allocate_bytes)
self.v_dst_ptrs.append(self.cpu_cache_kvs[val_name])
if args.cache_dtype == "block_wise_fp8":
self.cpu_cache_kvs[value_cache_scales_name] = cuda_host_alloc(scales_value_need_to_allocate_bytes)
self.v_scales_ptrs.append(self.cpu_cache_kvs[value_cache_scales_name])
logger.info(f"[rank {self.rank}/{self.n_ranks}] ✅ swap space (cpu cache) is ready!")
self.swap_space_ready_signal.value[self.rank] = 1
@@ -492,6 +555,25 @@ class CacheTransferManager:
self.device,
0,
)
if self.cache_dtype == "block_wise_fp8":
swap_cache_all_layers(
self.gpu_cache_scales_k_tensors,
self.k_scales_ptrs,
self.num_cpu_blocks,
gpu_block_ids,
cpu_block_ids,
self.device,
0,
)
swap_cache_all_layers(
self.gpu_cache_scales_v_tensors,
self.v_scales_ptrs,
self.num_cpu_blocks,
gpu_block_ids,
cpu_block_ids,
self.device,
0,
)
elif event_type.value == CacheStatus.SWAP2GPU.value:
swap_cache_all_layers(
@@ -512,6 +594,25 @@ class CacheTransferManager:
self.device,
1,
)
if self.cache_dtype == "block_wise_fp8":
swap_cache_all_layers(
self.gpu_cache_scales_k_tensors,
self.k_scales_ptrs,
self.num_cpu_blocks,
gpu_block_ids,
cpu_block_ids,
self.device,
1,
)
swap_cache_all_layers(
self.gpu_cache_scales_v_tensors,
self.v_scales_ptrs,
self.num_cpu_blocks,
gpu_block_ids,
cpu_block_ids,
self.device,
1,
)
else:
logger.warning(
f"transfer data: Get unexpected event type {event_type}, only SWAP2CPU and SWAP2GPU supported"