[ATTENTION] unitest (#4962)

This commit is contained in:
周周周
2025-11-14 13:45:53 +08:00
committed by GitHub
parent 91d34c2e35
commit c0a4393d72
2 changed files with 182 additions and 246 deletions

View File

@@ -17,7 +17,7 @@
"IsDynamicC8"
],
"dispatch_params": {
"GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16],
"GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16],
"HEAD_DIM": [128],
"BLOCK_SIZE": [64],
"CAUSAL": [0, 1],
@@ -54,7 +54,7 @@
"ENABLE_PREFILL"
],
"dispatch_params": {
"GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16],
"GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16],
"HEAD_DIM": [128],
"BLOCK_SIZE": [64],
"CAUSAL": [0, 1],
@@ -89,7 +89,7 @@
"ENABLE_PREFILL"
],
"dispatch_params": {
"GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16],
"GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16],
"HEAD_DIM": [64,128],
"BLOCK_SIZE": [64],
"CAUSAL": [0, 1],

View File

@@ -18,12 +18,11 @@ import json
import os
import shutil
import tempfile
import time
import types
import unittest
import numpy as np
import paddle
from paddle import nn
import paddle.device.cuda.graphs as graphs
from fastdeploy.config import (
CacheConfig,
@@ -46,10 +45,12 @@ from fastdeploy.model_executor.layers.attention import (
from fastdeploy.model_executor.layers.attention.append_attn_backend import (
allocate_launch_related_buffer,
)
from fastdeploy.model_executor.layers.quantization import parse_quant_config
from fastdeploy.model_executor.layers.quantization.mix_quant import MixQuantConfig
from fastdeploy.model_executor.layers.rotary_embedding import get_rope
from fastdeploy.model_executor.models.ernie4_5_moe import Ernie4_5_Attention
from fastdeploy.model_executor.pre_and_post_process import pre_process
from fastdeploy.model_executor.ops.gpu import get_padding_offset
os.environ.setdefault("DG_NVCC_OVERRIDE_CPP_STANDARD", "17")
class TestAttentionPerformance(unittest.TestCase):
@@ -65,20 +66,8 @@ class TestAttentionPerformance(unittest.TestCase):
self.model_dir = self.create_model_config_json()
self.fd_config = self.create_fd_config_from_model_path(self.model_dir, tensor_parallel_size=1)
# Adjust config for the test
self.fd_config.model_config.max_model_len = 2 * (9000 + 128)
self.fd_config.model_config.num_hidden_layers = 1
self.fd_config.parallel_config.tp_group = [0]
# Mock quantization config
mock_args = types.SimpleNamespace()
mock_args.quantization = None
# NOTE: Dense Gemm 跑block_wise_fp8请使用下面这一行. 同时设置config里量化相关选项.
# mock_args.quantization = {"quantization": "block_wise_fp8"}
mock_args.dynamic_load_weight = False
quant_config = parse_quant_config(mock_args, self.fd_config.model_config, is_ernie=1, is_v1_loader=1)
self.fd_config.quant_config = quant_config
# Initialize Attention Layer
os.environ["FD_ATTENTION_BACKEND"] = "APPEND_ATTN"
attn_cls = get_attention_backend()
@@ -92,10 +81,23 @@ class TestAttentionPerformance(unittest.TestCase):
encoder_block_shape_q=64,
decoder_block_shape_q=16,
)
self.attention_layer = Ernie4_5_Attention(self.fd_config, layer_id=0, prefix="test_layer")
state_dict = self.create_random_attention_state_dict(self.fd_config, prefix="test_layer")
self.attention_layer.load_state_dict(state_dict)
self.attention_layer.attn.cache_quant_type_str = "block_wise_fp8"
num_layers = self.fd_config.model_config.num_hidden_layers
self.attention_layer = [None] * num_layers
for i in range(num_layers):
self.attention_layer[i] = Ernie4_5_Attention(self.fd_config, layer_id=i, prefix="test_layer")
state_dict = self.create_random_attention_state_dict(self.fd_config, prefix="test_layer")
self.attention_layer[i].load_state_dict(state_dict)
self.attention_layer[i].attn.cache_quant_type_str = "block_wise_fp8"
def attn_forward(forward_meta, hidden_states):
for i in range(num_layers):
hidden_states = self.attention_layer[i](forward_meta, hidden_states)
return hidden_states
self.attn_forward = attn_forward
print("===== Initialization Complete =====")
def tearDown(self):
@@ -114,42 +116,13 @@ class TestAttentionPerformance(unittest.TestCase):
"""
config_dict = {
"architectures": ["Ernie4_5_MoeForCausalLM"],
"bos_token_id": 1,
"eos_token_id": 2,
"dtype": "bfloat16",
"hidden_act": "silu",
"hidden_size": 8192,
"intermediate_size": 28672,
"hidden_size": 1536,
"max_position_embeddings": 131072,
"model_type": "ernie4_5_moe",
"num_attention_heads": 64,
"num_key_value_heads": 8,
"num_hidden_layers": 5,
"pad_token_id": 0,
"rms_norm_eps": 1e-05,
"use_cache": False,
"vocab_size": 103424,
"rope_theta": 500000,
"use_rmsnorm": True,
"use_bias": False,
"moe_num_experts": 64,
"moe_layer_start_index": 1,
"moe_intermediate_size": 3584,
"moe_capacity": [64, 64, 64],
"moe_gate": "topk",
"moe_k": 4,
"moe_layer_interval": 1,
"moe_use_aux_free": True,
"num_nextn_predict_layers": 1,
"tie_word_embeddings": False,
"is_quantized": False,
# NOTE: 跑量化推理请取消注释
# "quantization_config": {
# "dense_quant_type": "block_wise_fp8",
# "moe_quant_type": "block_wise_fp8",
# "kv_cache_quant_type": "float8_e4m3fn",
# "quantization": "mix_quant",
# },
"max_model_len": 2 * (9000 + 128),
"num_attention_heads": 12,
"num_key_value_heads": 4,
"num_hidden_layers": 39,
}
model_dir = tempfile.mkdtemp(prefix="tmp_model_config_")
config_path = os.path.join(model_dir, "config.json")
@@ -158,41 +131,66 @@ class TestAttentionPerformance(unittest.TestCase):
print(f"Successfully created config.json at: {config_path}")
return model_dir
def create_fd_config_from_model_path(self, model_path, tensor_parallel_size=1):
"""Creates a complete FDConfig from a model path."""
model_args = {"model": model_path, "dtype": "bfloat16"}
model_config = ModelConfig(model_args)
model_config.tensor_parallel_size = tensor_parallel_size
parallel_config = ParallelConfig({"tensor_parallel_size": tensor_parallel_size, "data_parallel_size": 1})
cache_config = CacheConfig(
{
"block_size": 64,
"model_cfg": model_config,
"tensor_parallel_size": tensor_parallel_size,
}
)
return FDConfig(
model_config=model_config,
cache_config=cache_config,
parallel_config=parallel_config,
scheduler_config=SchedulerConfig({}),
load_config=LoadConfig({}),
quant_config=MixQuantConfig(
dense_quant_type="block_wise_fp8", moe_quant_type="block_wise_fp8", kv_cache_quant_type="float8_e4m3fn"
),
graph_opt_config=GraphOptimizationConfig({}),
commit_config=CommitConfig(),
device_config=DeviceConfig({}),
speculative_config=SpeculativeConfig({}),
early_stop_config=EarlyStopConfig({}),
)
def create_random_attention_state_dict(self, fd_config: FDConfig, prefix: str) -> dict:
"""
Creates a state_dict with random weights for the Ernie4_5_Attention layer.
"""
print("INFO: Creating random weights for testing...")
with paddle.no_grad():
hidden_size = fd_config.model_config.hidden_size
tp_size = fd_config.parallel_config.tensor_parallel_size
tensor_dtype = getattr(paddle, fd_config.model_config.dtype)
hidden_size = fd_config.model_config.hidden_size
tp_size = fd_config.parallel_config.tensor_parallel_size
tensor_dtype = getattr(paddle, fd_config.model_config.dtype)
q_dims = fd_config.model_config.num_attention_heads * fd_config.model_config.head_dim
kv_dims = fd_config.model_config.num_key_value_heads * fd_config.model_config.head_dim
total_output_dim = q_dims + 2 * kv_dims
qkv_proj_output_dim_tp = total_output_dim // tp_size
qkv_weight_shape = [hidden_size, qkv_proj_output_dim_tp]
q_dims = fd_config.model_config.num_attention_heads * fd_config.model_config.head_dim
kv_dims = fd_config.model_config.num_key_value_heads * fd_config.model_config.head_dim
total_output_dim = q_dims + 2 * kv_dims
qkv_proj_output_dim_tp = total_output_dim // tp_size
qkv_weight_shape = [hidden_size, qkv_proj_output_dim_tp]
o_proj_input_dim = fd_config.model_config.num_attention_heads * fd_config.model_config.head_dim
o_proj_input_dim_tp = o_proj_input_dim // tp_size
o_proj_weight_shape = [o_proj_input_dim_tp, hidden_size]
o_proj_input_dim = fd_config.model_config.num_attention_heads * fd_config.model_config.head_dim
o_proj_input_dim_tp = o_proj_input_dim // tp_size
o_proj_weight_shape = [o_proj_input_dim_tp, hidden_size]
qkv_weight = paddle.randn(qkv_weight_shape, dtype=tensor_dtype)
o_proj_weight = paddle.randn(o_proj_weight_shape, dtype=tensor_dtype)
qkv_weight = paddle.randn(qkv_weight_shape, dtype=tensor_dtype)
o_proj_weight = paddle.randn(o_proj_weight_shape, dtype=tensor_dtype)
kv_num_heads_tp = (
fd_config.model_config.num_key_value_heads // fd_config.parallel_config.tensor_parallel_size
)
activation_scale_shape = [kv_num_heads_tp]
activation_scale_tensor = paddle.full(shape=activation_scale_shape, fill_value=1.0, dtype=tensor_dtype)
kv_num_heads_tp = fd_config.model_config.num_key_value_heads // fd_config.parallel_config.tensor_parallel_size
activation_scale_shape = [kv_num_heads_tp]
activation_scale_tensor = paddle.full(shape=activation_scale_shape, fill_value=1.0, dtype=tensor_dtype)
state_dict = {
f"{prefix}.qkv_proj.weight": qkv_weight,
f"{prefix}.o_proj.weight": o_proj_weight,
f"{prefix}.cachek_matmul.activation_scale": activation_scale_tensor,
f"{prefix}.cachev_matmul.activation_scale": activation_scale_tensor,
}
state_dict = {
f"{prefix}.qkv_proj.weight": qkv_weight,
f"{prefix}.o_proj.weight": o_proj_weight,
f"{prefix}.cachek_matmul.activation_scale": activation_scale_tensor,
f"{prefix}.cachev_matmul.activation_scale": activation_scale_tensor,
}
return state_dict
def create_forward_meta(
@@ -202,11 +200,7 @@ class TestAttentionPerformance(unittest.TestCase):
mode: ForwardMode,
fd_config: FDConfig,
attn_backend: AttentionBackend,
past_kv_len: int = 0,
existing_caches: list[paddle.Tensor] | None = None,
existing_block_tables: paddle.Tensor | None = None,
use_dynamic_quant: bool = False,
free_blocks_pool: list[int] | None = None,
) -> ForwardMeta:
"""
Creates a high-fidelity ForwardMeta object.
@@ -217,7 +211,7 @@ class TestAttentionPerformance(unittest.TestCase):
seq_lens_this_time = seq_lens_encoder
elif mode == ForwardMode.DECODE:
seq_lens_encoder = paddle.zeros([batch_size], dtype="int32")
seq_lens_decoder = paddle.full([batch_size], past_kv_len, dtype="int32")
seq_lens_decoder = paddle.full([batch_size], seq_len, dtype="int32")
seq_lens_this_time = paddle.ones([batch_size], dtype="int32")
else:
raise ValueError(f"Unsupported ForwardMode: {mode}")
@@ -233,49 +227,32 @@ class TestAttentionPerformance(unittest.TestCase):
block_size=fd_config.cache_config.block_size,
)
if existing_caches is None:
block_size = fd_config.cache_config.block_size
max_model_len = fd_config.model_config.max_model_len
num_blocks_per_seq = (max_model_len + block_size - 1) // block_size
num_blocks = num_blocks_per_seq * batch_size
head_dim = fd_config.model_config.head_dim
kv_num_heads_tp = (
fd_config.model_config.num_key_value_heads // fd_config.parallel_config.tensor_parallel_size
)
num_layers = fd_config.model_config.num_hidden_layers
cache_type = fd_config.model_config.dtype
block_size = fd_config.cache_config.block_size
max_model_len = fd_config.model_config.max_model_len
num_blocks_per_seq = (max_model_len + block_size - 1) // block_size
num_blocks = num_blocks_per_seq * batch_size
head_dim = fd_config.model_config.head_dim
kv_num_heads_tp = fd_config.model_config.num_key_value_heads // fd_config.parallel_config.tensor_parallel_size
num_layers = fd_config.model_config.num_hidden_layers
cache_type = fd_config.model_config.dtype
if use_dynamic_quant:
cache_type = "uint8"
cache_shape = (num_blocks, kv_num_heads_tp, block_size, head_dim)
scale_shape = (num_blocks, kv_num_heads_tp, block_size)
caches = []
for _ in range(num_layers):
key_cache = paddle.randint(0, 255, shape=cache_shape, dtype="int32").cast(cache_type)
value_cache = paddle.randint(0, 255, shape=cache_shape, dtype="int32").cast(cache_type)
caches.extend([key_cache, value_cache])
if use_dynamic_quant:
cache_type = "uint8"
cache_shape = (num_blocks, kv_num_heads_tp, block_size, head_dim)
scale_shape = (num_blocks, kv_num_heads_tp, block_size)
caches = []
for _ in range(num_layers):
key_cache = paddle.randint(0, 255, shape=cache_shape, dtype="int32").cast(cache_type)
value_cache = paddle.randint(0, 255, shape=cache_shape, dtype="int32").cast(cache_type)
caches.extend([key_cache, value_cache])
if use_dynamic_quant:
key_cache_scale = paddle.rand(shape=scale_shape, dtype=fd_config.model_config.dtype)
value_cache_scale = paddle.rand(shape=scale_shape, dtype=fd_config.model_config.dtype)
caches.extend([key_cache_scale, value_cache_scale])
else:
caches = existing_caches
key_cache_scale = paddle.rand(shape=scale_shape, dtype=fd_config.model_config.dtype)
value_cache_scale = paddle.rand(shape=scale_shape, dtype=fd_config.model_config.dtype)
caches.extend([key_cache_scale, value_cache_scale])
if existing_block_tables is None:
block_size = fd_config.cache_config.block_size
max_model_len = fd_config.model_config.max_model_len
num_blocks_per_seq = (max_model_len + block_size - 1) // block_size
if free_blocks_pool is None:
total_blocks_for_this_run = num_blocks_per_seq * batch_size
free_blocks_pool = list(range(total_blocks_for_this_run - 1, -1, -1))
block_tables = paddle.zeros(shape=(batch_size, num_blocks_per_seq), dtype="int32")
num_blocks_to_alloc = (seq_len + block_size - 1) // block_size
for i in range(batch_size):
for j in range(num_blocks_to_alloc):
if not free_blocks_pool:
raise RuntimeError("Out of free blocks during test setup!")
block_tables[i, j] = free_blocks_pool.pop()
else:
block_tables = existing_block_tables
block_tables = paddle.zeros(shape=(batch_size, num_blocks_per_seq), dtype="int32")
for i in range(batch_size):
for j in range(num_blocks_per_seq):
block_tables[i, j] = i * num_blocks_per_seq + j
tmp_position_ids = paddle.arange(fd_config.model_config.max_model_len).reshape((1, -1))
rope_emb = get_rope(
@@ -287,16 +264,12 @@ class TestAttentionPerformance(unittest.TestCase):
)
input_ids = paddle.zeros([batch_size, seq_len if mode == ForwardMode.EXTEND else 1], dtype="int64")
(
ids_remove_padding,
batch_id_per_token,
cu_seqlens_q,
cu_seqlens_k,
output_cum_offsets,
output_padding_offset,
) = pre_process(input_ids, seq_lens_this_time, False, None, seq_lens_encoder, seq_lens_decoder)
token_num = paddle.sum(seq_lens_this_time)
ids_remove_padding, batch_id_per_token, cu_seqlens_q, cu_seqlens_k = get_padding_offset(
input_ids, token_num, seq_lens_this_time
)
meta = ForwardMeta(
forward_meta = ForwardMeta(
ids_remove_padding=ids_remove_padding,
seq_lens_encoder=seq_lens_encoder,
seq_lens_decoder=seq_lens_decoder,
@@ -314,148 +287,111 @@ class TestAttentionPerformance(unittest.TestCase):
attn_mask_offsets=None,
**attn_backend_buffers,
)
return meta, free_blocks_pool
def profile_attention_layer(
self,
title: str,
model: nn.Layer,
hidden_states: paddle.Tensor,
forward_meta: ForwardMeta,
warmup_steps: int,
test_steps: int,
):
print(f"\n--- {title} ---")
print(f"Input shape: {hidden_states.shape}")
for _ in range(warmup_steps):
_ = model(forward_meta, hidden_states)
paddle.device.cuda.synchronize()
start_time = time.time()
for _ in range(test_steps):
_ = model(forward_meta, hidden_states)
paddle.device.cuda.synchronize()
end_time = time.time()
total_time = end_time - start_time
avg_latency_ms = (total_time / test_steps) * 1000
print(f"Result: Average latency is {avg_latency_ms:.4f} ms over {test_steps} steps.")
return avg_latency_ms
def create_fd_config_from_model_path(self, model_path, tensor_parallel_size=1):
"""Creates a complete FDConfig from a model path."""
model_args = {"model": model_path, "dtype": "bfloat16"}
model_config = ModelConfig(model_args)
model_config.tensor_parallel_size = tensor_parallel_size
parallel_config = ParallelConfig({"tensor_parallel_size": tensor_parallel_size, "data_parallel_size": 1})
cache_config = CacheConfig(
{
"block_size": 64,
"gpu_memory_utilization": 0.9,
"cache_dtype": "bfloat16",
"model_cfg": model_config,
"tensor_parallel_size": tensor_parallel_size,
}
)
return FDConfig(
model_config=model_config,
cache_config=cache_config,
parallel_config=parallel_config,
scheduler_config=SchedulerConfig({}),
load_config=LoadConfig({}),
graph_opt_config=GraphOptimizationConfig({}),
commit_config=CommitConfig(),
device_config=DeviceConfig({}),
speculative_config=SpeculativeConfig({}),
early_stop_config=EarlyStopConfig({}),
plas_attention_config=None,
test_mode=True,
)
# endregion
return forward_meta
def test_decode_performance_with_prefill(self):
"""
Tests decode performance after a long prefill, using a pre-allocate,
fill, and then profile pattern.
"""
# Test parameters
warmup_steps = 10
test_steps = 100
prefill_batch_size = 1
decode_batch_size = 10 # This can be configured as needed
prefill_seq_len = 9000
decode_batch_size = 100 # This can be configured as needed
prefill_seq_len = 4096
use_dynamic_quant = True
act_tensor_dtype = paddle.bfloat16
# --- Step 1: Pre-allocate KV Cache for the max batch size ---
print(f"\n--- Step 1: Pre-allocating KV Cache for max batch size {decode_batch_size} ---")
large_meta, free_blocks_pool = self.create_forward_meta(
batch_size=decode_batch_size,
seq_len=prefill_seq_len,
mode=ForwardMode.EXTEND,
fd_config=self.fd_config,
attn_backend=self.attn_backend,
use_dynamic_quant=use_dynamic_quant,
)
print(f"Large meta created with Block Tables shape: {large_meta.block_tables.shape}")
# --- Step 2: Run Prefill to populate the first cache slot ---
print(f"\n--- Step 2: Running Prefill (BS={prefill_batch_size}, SeqLen={prefill_seq_len}) ---")
prefill_hidden_states = paddle.randn(
[prefill_batch_size * prefill_seq_len, self.fd_config.model_config.hidden_size],
dtype=act_tensor_dtype,
)
prefill_meta_view, temp_pool = self.create_forward_meta(
forward_meta = self.create_forward_meta(
batch_size=prefill_batch_size,
seq_len=prefill_seq_len,
mode=ForwardMode.EXTEND,
fd_config=self.fd_config,
attn_backend=self.attn_backend,
existing_caches=large_meta.caches,
existing_block_tables=large_meta.block_tables[:prefill_batch_size],
use_dynamic_quant=use_dynamic_quant,
free_blocks_pool=free_blocks_pool,
)
self.attn_backend.init_attention_metadata(prefill_meta_view)
with paddle.no_grad():
_ = self.attention_layer(prefill_meta_view, prefill_hidden_states)
paddle.device.cuda.synchronize()
print("Prefill complete.")
self.attn_backend.init_attention_metadata(forward_meta)
self.attn_forward(forward_meta, prefill_hidden_states)
paddle.device.synchronize()
import paddle.profiler as profiler
# p = profiler.Profiler(
# targets=[profiler.ProfilerTarget.CPU, profiler.ProfilerTarget.GPU],
# on_trace_ready=profiler.export_chrome_tracing("./profile_log"),
# )
# p.start()
# p.step()
start_events = [paddle.device.cuda.Event(enable_timing=True) for _ in range(test_steps)]
end_events = [paddle.device.cuda.Event(enable_timing=True) for _ in range(test_steps)]
for i in range(test_steps):
start_events[i].record()
self.attn_forward(forward_meta, prefill_hidden_states)
end_events[i].record()
paddle.device.synchronize()
times = np.array([round(s.elapsed_time(e), 1) for s, e in zip(start_events, end_events)])[1:]
print(times[-5:])
# p.stop()
# --- Step 3: Profile Decode performance on all copies ---
print(f"\n--- Step 3: Profiling Decode (BS={decode_batch_size}) ---")
decode_hidden_states = paddle.randn(
[decode_batch_size * 1, self.fd_config.model_config.hidden_size], dtype=act_tensor_dtype
[decode_batch_size, self.fd_config.model_config.hidden_size], dtype=act_tensor_dtype
)
decode_meta, _ = self.create_forward_meta(
forward_meta = self.create_forward_meta(
batch_size=decode_batch_size,
seq_len=1,
seq_len=5000,
mode=ForwardMode.DECODE,
fd_config=self.fd_config,
attn_backend=self.attn_backend,
past_kv_len=prefill_seq_len,
existing_caches=large_meta.caches,
existing_block_tables=large_meta.block_tables,
use_dynamic_quant=use_dynamic_quant,
free_blocks_pool=temp_pool,
)
self.attn_backend.init_attention_metadata(decode_meta)
self.attn_backend.init_attention_metadata(forward_meta)
self.profile_attention_layer(
f"Decode Perf (BS={decode_batch_size} after 1x{prefill_seq_len}-token Prefill)",
self.attention_layer,
decode_hidden_states,
decode_meta,
warmup_steps,
test_steps,
p = profiler.Profiler(
targets=[profiler.ProfilerTarget.CPU, profiler.ProfilerTarget.GPU],
on_trace_ready=profiler.export_chrome_tracing("./profile_log"),
)
p.start()
p.step()
paddle.device.synchronize()
# 必须要先预热一次!因为预处理被放到了第一层再做了!
self.attn_forward(forward_meta, decode_hidden_states)
attn_cuda_graphs = graphs.CUDAGraph()
attn_cuda_graphs.capture_begin()
self.attn_forward(forward_meta, decode_hidden_states)
attn_cuda_graphs.capture_end()
start_events = [paddle.device.cuda.Event(enable_timing=True) for _ in range(test_steps)]
end_events = [paddle.device.cuda.Event(enable_timing=True) for _ in range(test_steps)]
for i in range(test_steps):
start_events[i].record()
attn_cuda_graphs.replay()
# self.attn_forward(forward_meta, decode_hidden_states)
end_events[i].record()
paddle.device.synchronize()
times = np.array([round(s.elapsed_time(e), 1) for s, e in zip(start_events, end_events)])[1:]
print(times[-5:])
p.stop()
if __name__ == "__main__":
unittest.main()