diff --git a/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_impl.cuh b/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_impl.cuh index 0af59a923..0e4b98966 100644 --- a/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_impl.cuh +++ b/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_impl.cuh @@ -428,6 +428,142 @@ __global__ void append_decode_cache_T_rope_kernel( } } +template +__global__ void append_decode_cache_T_neox_partial_rope_kernel( + const T* __restrict__ qkv, // [bsz, num_heads + 2 * kv_num_heads, + // head_size] + T* __restrict__ key_cache, // [num_blocks, kv_num_heads, block_size, + // head_size // 2] + T* __restrict__ value_cache, // [num_blocks, kv_num_heads, block_size, + // head_size // 2] + T* __restrict__ qkv_out, + const int* __restrict__ block_tables, // [bsz, max_blocks_per_seq] + const int* __restrict__ cu_seqlens_q, + const int* __restrict__ seq_lens, // [bsz] + const int* __restrict__ seq_lens_encoder, // [bsz] + const float* __restrict__ cos_emb, // [2, 1, max_model_len, 1, rotary_dim/2] + const float* __restrict__ sin_emb, // [2, 1, max_model_len, 1, rotary_dim/2] + const int max_seq_len, + const int max_blocks_per_seq, + const int num_heads, + const int head_size, + const int rotary_dim, + const int block_size, + const uint32_t elem_cnt, + const int kv_num_heads, + const bool rope_3d) { + using LoadT = AlignedVector; + using LoadBiasT = AlignedVector; + using LoadKVT = AlignedVector; + constexpr int HalfVecSize = VecSize / 2; + using LoadEmbT = AlignedVector; + + LoadT left_vec, right_vec; + LoadBiasT left_bias_vec, right_bias_vec; + LoadKVT left_cache_vec, right_cache_vec; + LoadEmbT cos_emb_vec; + LoadEmbT sin_emb_vec; + + int64_t global_thread_idx = blockDim.x * blockIdx.x + threadIdx.x; + const int half_head_size = head_size / 2; + const int half_rotary_dim = rotary_dim / 2; + const int64_t hidden_size = (num_heads + 2 * kv_num_heads) * head_size; + const int64_t half_hidden_size = hidden_size / 2; + // const int64_t offset = 2 * hidden_size; + + for (int32_t linear_index = global_thread_idx * VecSize, + step = gridDim.x * blockDim.x * VecSize; + linear_index < elem_cnt; + linear_index += step) { + const int ori_bi = linear_index / half_hidden_size; + const int bias = linear_index % half_hidden_size; + const int hi = bias / half_head_size; // q + k + v + const int h_bias = bias % half_head_size; + if (hi < num_heads && h_bias >= half_rotary_dim){ + continue; + } + const int start_token_idx = cu_seqlens_q[ori_bi]; + if (seq_lens_encoder[ori_bi] > 0) return; + const int write_seq_id = seq_lens[ori_bi]; + if (write_seq_id == 0) continue; + + const int* block_table_now = nullptr; + + block_table_now = block_tables + ori_bi * max_blocks_per_seq; + const int block_idx = block_table_now[write_seq_id / block_size]; + const int block_offset = write_seq_id % block_size; + uint32_t ori_idx_left = + start_token_idx * hidden_size + hi * head_size + h_bias; + uint32_t ori_idx_right = ori_idx_left + half_head_size; + if (hi < num_heads){ + ori_idx_right = ori_idx_left + half_rotary_dim; + }else if (hi < num_heads + kv_num_heads){ + if (h_bias < half_rotary_dim){ + ori_idx_right = ori_idx_left + half_rotary_dim; + }else{ + ori_idx_left = ori_idx_left + half_rotary_dim; + ori_idx_right = ori_idx_left + half_rotary_dim; + } + } + + Load(&qkv[ori_idx_left], &left_vec); + Load(&qkv[ori_idx_right], &right_vec); + + if (hi < num_heads + kv_num_heads) { + // q k rope + const uint32_t emb_idx = write_seq_id * half_rotary_dim + h_bias; + uint32_t new_emb_idx = rope_3d ? emb_idx + ori_bi * max_seq_len * head_size * 2 : emb_idx; + if (h_bias < half_rotary_dim){ + Load(&cos_emb[new_emb_idx], &cos_emb_vec); + Load(&sin_emb[new_emb_idx], &sin_emb_vec); + } + } +#pragma unroll + for (int i = 0; i < VecSize; i++) { + // rope + float input_left = static_cast(left_vec[i]); + float input_right = static_cast(right_vec[i]); + if (hi < num_heads + kv_num_heads && h_bias < half_rotary_dim) { + const float cos_tmp = cos_emb_vec[i]; + const float sin_tmp = sin_emb_vec[i]; + left_bias_vec[i] = + static_cast(input_left * cos_tmp - input_right * sin_tmp); + right_bias_vec[i] = + static_cast(input_right * cos_tmp + input_left * sin_tmp); + } else { + left_bias_vec[i] = static_cast(input_left); + right_bias_vec[i] = static_cast(input_right); + } + } + if (hi < num_heads) { + // write q + Store(left_bias_vec, &qkv_out[ori_idx_left]); + Store(right_bias_vec, &qkv_out[ori_idx_right]); + } else { + // write k/v + const uint32_t kv_head_idx = (hi - num_heads) % kv_num_heads; + uint32_t tgt_idx_left = + block_idx * kv_num_heads * block_size * head_size + + kv_head_idx * block_size * head_size + block_offset * head_size + + h_bias; + uint32_t tgt_idx_right = tgt_idx_left + half_head_size; + if (hi < num_heads + kv_num_heads) { + if (h_bias < half_rotary_dim) { + tgt_idx_right = tgt_idx_left + half_rotary_dim; + }else{ + tgt_idx_left = tgt_idx_left + half_rotary_dim; + tgt_idx_right = tgt_idx_left + half_rotary_dim; + } + Store(left_bias_vec, &key_cache[tgt_idx_left]); + Store(right_bias_vec, &key_cache[tgt_idx_right]); + } else { + Store(left_bias_vec, &value_cache[tgt_idx_left]); + Store(right_bias_vec, &value_cache[tgt_idx_right]); + } + } + } +} + template __global__ void append_decode_cache_T_neox_rope_kernel( const T* __restrict__ qkv, // [bsz, num_heads + 2 * kv_num_heads, diff --git a/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_kernel.cu b/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_kernel.cu index 63780e419..8eb70571a 100644 --- a/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_kernel.cu +++ b/custom_ops/gpu_ops/append_attn/decoder_write_cache_with_rope_kernel.cu @@ -94,6 +94,7 @@ void append_decode_cache_rope(const QKV_TYPE* qkv, const int num_heads, const int kv_num_heads, const int dim_head, + const int rotary_dim, const int block_size, const int bsz, const cudaStream_t& stream, @@ -133,7 +134,29 @@ void append_decode_cache_rope(const QKV_TYPE* qkv, kv_num_heads, rope_3d); } else { - append_decode_cache_T_neox_rope_kernel + if (rotary_dim < dim_head){ + append_decode_cache_T_neox_partial_rope_kernel + <<>>(reinterpret_cast(qkv), + key_cache, + value_cache, + qkv_out, + block_tables, + cu_seqlens_q, + seq_lens, + seq_lens_encoder, + cos_emb, + sin_emb, + max_seq_len, + max_blocks_per_seq, + num_heads, + dim_head, + rotary_dim, + block_size, + elem_nums, + kv_num_heads, + rope_3d); + }else{ + append_decode_cache_T_neox_rope_kernel <<>>(reinterpret_cast(qkv), key_cache, value_cache, @@ -152,6 +175,7 @@ void append_decode_cache_rope(const QKV_TYPE* qkv, elem_nums, kv_num_heads, rope_3d); + } } } else { if (qkv_out_scales) { @@ -516,11 +540,20 @@ void DecoderWriteCacheWithRoPEKernel( const float* cos_emb = rotary_embs ? rotary_embs.get().data() : nullptr; const float* sin_emb; + int rotary_dim = dim_head; if (rotary_embs) { sin_emb = use_neox_rotary_style ? rotary_embs.get().data() + max_seq_len * dim_head : rotary_embs.get().data() + max_seq_len * dim_head / 2; + rotary_dim = rotary_embs.get().dims()[rotary_embs.get().dims().size()-1] * 2; + if(rotary_dim < dim_head){ + if (!use_neox_rotary_style || qkv_out_scales || q_norm_weight || k_norm_weight|| cache_quant_type_str != "none"){ + PADDLE_THROW(phi::errors::Fatal( + "partial_rotary_factor < 1.0 only supports neox_rotary_style=True, qkv_out_scales is None, q_norm_weight/k_norm_weight) is None, and cache_quant_type_str is 'none'.")); + } + sin_emb = rotary_embs.get().data() + max_seq_len * rotary_dim / 2; + } } if (q_norm_weight && k_norm_weight) { @@ -609,6 +642,7 @@ void DecoderWriteCacheWithRoPEKernel( num_heads, kv_num_heads, dim_head, + rotary_dim, block_size, bsz, stream, diff --git a/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_impl.cuh b/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_impl.cuh index c4e46a6e5..7fde6523f 100644 --- a/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_impl.cuh +++ b/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_impl.cuh @@ -900,6 +900,74 @@ __global__ void GQANeoxVariableLengthRotaryKernel( } } +template +__global__ void GQANeoxVariableLengthPartialRotaryKernel( + const T *qkv, + const float *cos_emb, + const float *sin_emb, + const int *batch_id_per_token, + const int *cu_seqlens_q, + const int *seq_lens, + const int *seq_lens_decoder, + const float *qkv_out_scales, + const T *qkv_biases, + T *qkv_out, + const int64_t elem_cnt, + const int q_num_head, + const int kv_num_head, + const int seq_len, + const int head_dim, + const int rotary_dim, + const bool rope_3d) { + using LoadT = AlignedVector; + using LoadEmbT = AlignedVector; + LoadT left_vec; + LoadT right_vec; + LoadEmbT cos_emb_vec; + LoadEmbT sin_emb_vec; + int64_t global_thread_idx = blockDim.x * blockIdx.x + threadIdx.x; + const int rotary_dim_half = rotary_dim / 2; + const int offset = (q_num_head + kv_num_head) * rotary_dim_half; + for (int64_t linear_index = global_thread_idx * VecSize, + step = gridDim.x * blockDim.x * VecSize; + linear_index < elem_cnt; + linear_index += step) { + const int token_idx = linear_index / offset; + const int ori_bi = batch_id_per_token[token_idx]; + if (seq_lens && seq_lens[ori_bi] == 0) continue; + const int bias = linear_index % offset; + const int hi = bias / rotary_dim_half; + const int h_bias = bias % rotary_dim_half; + + const int ori_seq_id = (token_idx - cu_seqlens_q[ori_bi]) + seq_lens_decoder[ori_bi]; + + const int emb_idx = ori_seq_id * rotary_dim_half + h_bias; + int64_t new_emb_idx = rope_3d ? emb_idx + ori_bi * head_dim * seq_len * 2 : emb_idx; + const int base_idx_left = + token_idx * (q_num_head + 2 * kv_num_head) * head_dim + hi * head_dim + + h_bias; + const int base_idx_right = base_idx_left + rotary_dim_half; + + Load(&qkv[base_idx_left], &left_vec); + Load(&qkv[base_idx_right], &right_vec); + Load(&cos_emb[new_emb_idx], &cos_emb_vec); + Load(&sin_emb[new_emb_idx], &sin_emb_vec); +#pragma unroll + for (int i = 0; i < VecSize; i++) { + const float input_left = static_cast(left_vec[i]); + const float input_right = static_cast(right_vec[i]); + const float cos_tmp = cos_emb_vec[i]; + const float sin_tmp = sin_emb_vec[i]; + left_vec[i] = + static_cast(input_left * cos_tmp - input_right * sin_tmp); + right_vec[i] = + static_cast(input_right * cos_tmp + input_left * sin_tmp); + } + Store(left_vec, &qkv_out[base_idx_left]); + Store(right_vec, &qkv_out[base_idx_right]); + } +} + template __global__ void cache_kernel( const T *__restrict__ qkv, // [num_tokens, num_heads + 2 * kv_num_heads, @@ -2160,6 +2228,7 @@ void gqa_rotary_qk_variable( const int seq_len, const int input_output_len, const int dim_head, + const int rotary_dim, const cudaStream_t &stream, bool use_neox_style = false, bool rope_3d = false) { @@ -2240,7 +2309,38 @@ void gqa_rotary_qk_variable( dim_head, rope_3d); } else { - GQANeoxVariableLengthRotaryKernel + if (rotary_dim < dim_head){ + PD_CHECK((rotary_dim / 2) % PackSize == 0); + elem_nums = + qkv_out_scales + ? token_num * (num_heads + 2 * kv_num_heads) * rotary_dim + : token_num * (num_heads + kv_num_heads) * rotary_dim; // for all q k v + if (use_neox_style) { + elem_nums /= 2; + } + const int pack_num_new = elem_nums / PackSize; + GetNumBlocks<128>(pack_num_new, &grid_size); + GQANeoxVariableLengthPartialRotaryKernel + <<>>( + reinterpret_cast(qkv_input), + cos_emb, + rotary_emb + input_output_len * rotary_dim / 2, + batch_id_per_token, + cu_seqlens_q, + seq_lens, + seq_lens_decoder, + qkv_out_scales, + qkv_bias, + qkv_out, + elem_nums, + num_heads, + kv_num_heads, + seq_len, + dim_head, + rotary_dim, + rope_3d); + }else{ + GQANeoxVariableLengthRotaryKernel <<>>( reinterpret_cast(qkv_input), cos_emb, @@ -2258,6 +2358,7 @@ void gqa_rotary_qk_variable( seq_len, dim_head, rope_3d); + } } } } diff --git a/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_kernel.h b/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_kernel.h index b0d66a291..5ea9ce213 100644 --- a/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_kernel.h +++ b/custom_ops/gpu_ops/append_attn/encoder_write_cache_with_rope_kernel.h @@ -55,9 +55,19 @@ void EncoderWriteCacheWithRopeKernel( auto kv_num_heads = meta_data.kv_num_heads; auto head_dim = meta_data.head_dims; bool is_scale_channel_wise = false; + int rotary_dim = head_dim; if (cache_k_scale && cache_k_scale.get().dims()[0] == head_dim * kv_num_heads) { is_scale_channel_wise = true; } + if (rotary_embs){ + rotary_dim = rotary_embs.get().dims()[rotary_embs.get().dims().size()-1] * 2; + if(rotary_dim < head_dim){ + if (!use_neox_style || q_norm_weight || k_norm_weight || num_heads == kv_num_heads || is_scale_channel_wise){ + PADDLE_THROW(phi::errors::Fatal( + "partial_rotary_factor < 1.0 only supports use_neox_rotary_style=True, q_norm_weight/k_norm_weight) is None, GQA and is_scale_channel_wise=false.")); + } + } + } if (q_norm_weight && k_norm_weight) { if (num_heads != kv_num_heads && !is_scale_channel_wise && !use_neox_style) { @@ -125,6 +135,7 @@ void EncoderWriteCacheWithRopeKernel( max_seq_len, rope_3d ? rotary_embs.get().dims()[3] : rotary_embs.get().dims()[2], head_dim, + rotary_dim, stream, use_neox_style, rope_3d); diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 8400e90da..d8c5b9f50 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -133,6 +133,7 @@ class ModelConfig: self.eos_tokens_lens: int = 2 self.lm_head_fp32: bool = False self.model_format = "auto" + self.partial_rotary_factor: float = 1.0 for key, value in args.items(): if hasattr(self, key) and value != "None": setattr(self, key, value) diff --git a/fastdeploy/model_executor/layers/rotary_embedding.py b/fastdeploy/model_executor/layers/rotary_embedding.py index e52add837..1d405d7a9 100644 --- a/fastdeploy/model_executor/layers/rotary_embedding.py +++ b/fastdeploy/model_executor/layers/rotary_embedding.py @@ -73,6 +73,30 @@ class ErnieRotaryEmbedding: return rot_emb +class GlmRotaryEmbedding: + def __init__(self, rotary_dim, base, partial_rotary_factor): + """ + Pre-calculate rotary position embedding for position_ids. + """ + self.rotary_dim = rotary_dim + self.base = base + if partial_rotary_factor < 1.0: + self.rotary_dim = int(self.rotary_dim * partial_rotary_factor) + + def __call__(self, position_ids): + bsz, max_seq_len = position_ids.shape[:2] + inv_freq = self.base ** (-paddle.arange(0, self.rotary_dim, 2, dtype="float32") / self.rotary_dim) + freqs = paddle.einsum("ij,k->ijk", position_ids.cast("float32"), inv_freq) + # shape: [B, S, D/2] + rot_emb = paddle.zeros((2, bsz, max_seq_len, 1, self.rotary_dim // 2), dtype="float32") + emb = paddle.stack([freqs], axis=-1).reshape((bsz, max_seq_len, self.rotary_dim // 2)) + # shape: [B, S, 1, D] + emb = paddle.unsqueeze(emb, 2) + rot_emb[0] = paddle.cos(emb) + rot_emb[1] = paddle.sin(emb) + return rot_emb + + class QwenRotaryEmbedding: def __init__(self, rotary_dim, base, partial_rotary_factor): """ @@ -246,6 +270,9 @@ def get_rope_impl( if model_config is None or architecture.startswith("Qwen"): rotary_emb_layer = QwenRotaryEmbedding(rotary_dim, base, partial_rotary_factor) rotary_emb = rotary_emb_layer(position_ids) + elif architecture.startswith("Glm"): + rotary_emb_layer = GlmRotaryEmbedding(rotary_dim, base, partial_rotary_factor) + rotary_emb = rotary_emb_layer(position_ids) else: rotary_emb_layer = ErnieRotaryEmbedding(rotary_dim, base, partial_rotary_factor) rotary_emb = rotary_emb_layer(position_ids) diff --git a/fastdeploy/model_executor/models/glm4_moe.py b/fastdeploy/model_executor/models/glm4_moe.py new file mode 100644 index 000000000..1a837cec2 --- /dev/null +++ b/fastdeploy/model_executor/models/glm4_moe.py @@ -0,0 +1,574 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +from __future__ import annotations + +import re +from functools import partial + +import paddle +from paddle import nn +from paddleformers.transformers import PretrainedModel +from paddleformers.utils.log import logger + +from fastdeploy.config import FDConfig +from fastdeploy.distributed.communication import tensor_model_parallel_all_reduce +from fastdeploy.model_executor.forward_meta import ForwardMeta +from fastdeploy.model_executor.graph_optimization.decorator import ( + support_graph_optimization, +) +from fastdeploy.model_executor.layers.activation import SiluAndMul +from fastdeploy.model_executor.layers.attention.attention import Attention +from fastdeploy.model_executor.layers.embeddings import VocabParallelEmbedding +from fastdeploy.model_executor.layers.linear import ( + MergedColumnParallelLinear, + QKVParallelLinear, + ReplicatedLinear, + RowParallelLinear, +) +from fastdeploy.model_executor.layers.lm_head import ParallelLMHead +from fastdeploy.model_executor.layers.moe.moe import FusedMoE +from fastdeploy.model_executor.layers.normalization import RMSNorm +from fastdeploy.model_executor.models.model_base import ModelForCasualLM + + +class Glm4MoeMLP(nn.Layer): + """ """ + + def __init__( + self, + fd_config: FDConfig, + intermediate_size: int, + prefix: str = "", + reduce_results: bool = True, + ) -> None: + super().__init__() + + self.up_gate_proj = MergedColumnParallelLinear( + fd_config=fd_config, + prefix=f"{prefix}.up_gate_proj", + input_size=fd_config.model_config.hidden_size, + output_size=intermediate_size * 2, + with_bias=False, + activation=fd_config.model_config.hidden_act, + ) + + self.down_proj = RowParallelLinear( + fd_config=fd_config, + prefix=f"{prefix}.down_proj", + input_size=intermediate_size, + output_size=fd_config.model_config.hidden_size, + with_bias=False, + reduce_results=reduce_results, + ) + + self.act_fn = SiluAndMul( + fd_config=fd_config, + bias=None, + act_method=fd_config.model_config.hidden_act, + ) + + def forward(self, x): + """ """ + gate_up_out = self.up_gate_proj(x) + act_out = self.act_fn(gate_up_out) + down_out = self.down_proj(act_out) + return down_out + + +class Glm4Moe(nn.Layer): + def __init__( + self, + fd_config: FDConfig, + layer_id: int, + prefix: str = "", + ) -> None: + super().__init__() + + self.expert_parallel_size = fd_config.parallel_config.expert_parallel_size + self.tensor_parallel_size = fd_config.parallel_config.tensor_parallel_size + self.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank + self.tp_group = fd_config.parallel_config.tp_group + + self.use_ep = self.expert_parallel_size > 1 + self.use_tp = self.tensor_parallel_size > 1 + + self.n_routed_experts: int = fd_config.model_config.n_routed_experts + self.n_shared_experts: int = fd_config.model_config.n_shared_experts + + weight_key_map = { + "gate_correction_bias_key": f"{prefix}.gate.e_score_correction_bias", + "up_gate_proj_expert_weight_key": f"{prefix}.experts.{{}}.up_gate_proj.weight", + "down_proj_expert_weight_key": f"{prefix}.experts.{{}}.down_proj.weight", + } + + self.gate = ReplicatedLinear( + fd_config=fd_config, + prefix=f"{prefix}.gate", + input_size=fd_config.model_config.hidden_size, + output_size=fd_config.model_config.n_routed_experts, + with_bias=False, + skip_quant=True, + weight_dtype="float32", + ) + self.gate.e_score_correction_bias = self.create_parameter( + shape=[1, fd_config.model_config.n_routed_experts], + dtype="float32", + default_initializer=paddle.nn.initializer.Constant(0), + ) + + self.experts = FusedMoE( + fd_config, + reduce_results=False, + moe_intermediate_size=fd_config.model_config.moe_intermediate_size, + num_experts=fd_config.model_config.n_routed_experts, + top_k=fd_config.model_config.num_experts_per_tok, + topk_method="noaux_tc", + topk_group=fd_config.model_config.topk_group, + n_group=fd_config.model_config.n_group, + routed_scaling_factor=fd_config.model_config.routed_scaling_factor, + layer_idx=layer_id, + gate_correction_bias=self.gate.e_score_correction_bias, + weight_key_map=weight_key_map, + ) + + shared_experts_intermediate_size = self.n_shared_experts * fd_config.model_config.moe_intermediate_size + + self.shared_experts = Glm4MoeMLP( + fd_config=fd_config, + intermediate_size=shared_experts_intermediate_size, + prefix=f"{prefix}.shared_experts", + reduce_results=False, + ) + + def forward(self, x): + shared_experts_out = self.shared_experts(x) + out = self.experts(x, self.gate) + out = out + shared_experts_out + # We do to TP all reduce after the sum of experts. + if self.tensor_parallel_size > 1: + tensor_model_parallel_all_reduce(out) + return out + + +class Glm4MoeAttention(nn.Layer): + """ """ + + def __init__(self, fd_config: FDConfig, layer_id: int, prefix: str = "") -> None: + super().__init__() + + tp_size = fd_config.parallel_config.tensor_parallel_size + self.fd_config = fd_config + self.head_dim = fd_config.model_config.head_dim + self.num_heads = fd_config.model_config.num_attention_heads // tp_size + self.num_kv_heads = fd_config.model_config.num_key_value_heads // tp_size + self.attention_bias = fd_config.model_config.attention_bias + self.use_qk_norm = fd_config.model_config.use_qk_norm + self.q_size = self.num_heads * self.head_dim + self.kv_size = self.num_kv_heads * self.head_dim + + self.qkv_proj = QKVParallelLinear(fd_config, prefix=f"{prefix}.qkv_proj", with_bias=self.attention_bias) + + self.o_proj = RowParallelLinear( + fd_config, + prefix=f"{prefix}.o_proj", + input_size=fd_config.model_config.num_attention_heads * fd_config.model_config.head_dim, + output_size=fd_config.model_config.hidden_size, + ) + + self.attn = Attention( + fd_config, + layer_id=layer_id, + prefix=prefix, + use_neox_rotary_style=True, + rms_norm_eps=fd_config.model_config.rms_norm_eps, + ) + if self.use_qk_norm: + self.q_norm = RMSNorm( + fd_config, + hidden_size=self.head_dim, + eps=fd_config.model_config.rms_norm_eps, + prefix=f"{prefix}.q_norm", + begin_norm_axis=2, + ) + self.k_norm = RMSNorm( + fd_config, + hidden_size=self.head_dim, + eps=fd_config.model_config.rms_norm_eps, + prefix=f"{prefix}.k_norm", + begin_norm_axis=2, + ) + + def forward( + self, + forward_meta: ForwardMeta, + hidden_states: paddle.Tensor, + ): + """ """ + qkv_out = self.qkv_proj(hidden_states) + + if self.use_qk_norm: + q, k, v = qkv_out.split([self.q_size, self.kv_size, self.kv_size], axis=-1) + q = self.q_norm(q.reshape([-1, self.num_heads, self.head_dim])).reshape(q.shape) + k = self.k_norm(k.reshape([-1, self.num_kv_heads, self.head_dim])).reshape(k.shape) + qkv_out = paddle.concat([q, k, v], axis=-1) + + atten_out = self.attn( + qkv=qkv_out, + forward_meta=forward_meta, + ) + output = self.o_proj(atten_out) + return output + + +class Glm4MoeDecoderLayer(nn.Layer): + """ """ + + def __init__( + self, + fd_config: FDConfig, + prefix: str = "", + ) -> None: + super().__init__() + + layer_id = int(prefix.split(sep=".")[-1]) + self.self_attn = Glm4MoeAttention( + fd_config=fd_config, + layer_id=layer_id, + prefix=f"{prefix}.self_attn", + ) + + if ( + fd_config.model_config.n_routed_experts is not None + and layer_id >= fd_config.model_config.first_k_dense_replace + ): + self.mlp = Glm4Moe(fd_config, layer_id, prefix=f"{prefix}.mlp") + else: + self.mlp = Glm4MoeMLP( + fd_config, + intermediate_size=fd_config.model_config.intermediate_size, + prefix=f"{prefix}.mlp", + ) + + self.input_layernorm = RMSNorm( + fd_config, + hidden_size=fd_config.model_config.hidden_size, + eps=fd_config.model_config.rms_norm_eps, + prefix=f"{prefix}.input_layernorm", + ) + + self.post_attention_layernorm = RMSNorm( + fd_config, + hidden_size=fd_config.model_config.hidden_size, + eps=fd_config.model_config.rms_norm_eps, + prefix=f"{prefix}.post_attention_layernorm", + ) + + def forward( + self, + forward_meta: ForwardMeta, + hidden_states: paddle.Tensor, + residual: paddle.Tensor = None, + ): + """ """ + if residual is None: + residual = hidden_states + hidden_states = self.input_layernorm(hidden_states) + else: + hidden_states, residual = self.input_layernorm(hidden_states, residual) + + hidden_states = self.self_attn( + hidden_states=hidden_states, + forward_meta=forward_meta, + ) + + # Fully Connected + hidden_states, residual = self.post_attention_layernorm(hidden_states, residual) + + hidden_states = self.mlp(hidden_states) + + return hidden_states, residual + + +@support_graph_optimization +class Glm4MoeModel(nn.Layer): + """ """ + + def __init__( + self, + fd_config: FDConfig = None, + ): + """ + Initializer for the Qwen2Model class. + + Args: + + """ + super().__init__() + + self.num_layers = fd_config.model_config.num_hidden_layers + fd_config.model_config.pretrained_config.prefix_name = "model" + + self.embed_tokens = VocabParallelEmbedding( + fd_config, + num_embeddings=fd_config.model_config.vocab_size, + embedding_dim=fd_config.model_config.hidden_size, + params_dtype=paddle.get_default_dtype, + prefix=(f"{fd_config.model_config.pretrained_config.prefix_name}.embed_tokens"), + ) + + self.layers = nn.LayerList( + [ + Glm4MoeDecoderLayer( + fd_config, + prefix=f"{fd_config.model_config.pretrained_config.prefix_name}.layers.{i}", + ) + for i in range(self.num_layers) + ] + ) + + self.norm = RMSNorm( + fd_config, + hidden_size=fd_config.model_config.hidden_size, + eps=fd_config.model_config.rms_norm_eps, + prefix=f"{fd_config.model_config.pretrained_config.prefix_name}.norm", + ) + + def forward( + self, + ids_remove_padding: paddle.Tensor, + forward_meta: ForwardMeta, + ): + """ """ + hidden_states = self.embed_tokens(ids_remove_padding=ids_remove_padding) + + residual = None + + for i in range(self.num_layers): + hidden_states, residual = self.layers[i](forward_meta, hidden_states, residual) + hidden_states = hidden_states + residual + + out = self.norm(hidden_states) + + return out + + +class Glm4MoeForCausalLM(ModelForCasualLM): + """ + Glm4MoeForCausalLM + """ + + def __init__(self, fd_config: FDConfig): + """ + Args: + fd_config (FDConfig): Configurations for the LLM model. + """ + super(Glm4MoeForCausalLM, self).__init__(fd_config) + + self.model = Glm4MoeModel(fd_config) + + self.ori_vocab_size = fd_config.model_config.ori_vocab_size + + self.lm_head = ParallelLMHead( + fd_config, + embedding_dim=fd_config.model_config.hidden_size, + num_embeddings=fd_config.model_config.vocab_size, + prefix="lm_head", + ) + + @classmethod + def name(self): + """ """ + return "Glm4MoeForCausalLM" + + @paddle.no_grad() + def load_weights(self, weights_iterator) -> None: + """ + Load model parameters from a given weights_iterator object. + + Args: + weights_iterator (Iterator): An iterator yielding (name, weight) pairs. + """ + + from fastdeploy.model_executor.utils import ( + default_weight_loader, + process_weights_after_loading, + ) + + stacked_params_mapping = [ + # (param_name, shard_name, shard_id) + ("qkv_proj", "q_proj", "q"), + ("qkv_proj", "k_proj", "k"), + ("qkv_proj", "v_proj", "v"), + ("up_gate_proj", "gate_proj", "gate"), + ("up_gate_proj", "up_proj", "up"), + ("embed_tokens.embeddings", "embed_tokens", None), + ("lm_head.linear", "lm_head", None), + ("experts.gate_correction_bias", "gate.e_score_correction_bias", None), + ] + # (param_name, weight_name, expert_id, shard_id) + expert_params_mapping = FusedMoE.make_expert_params_mapping( + num_experts=self.fd_config.model_config.n_routed_experts, + ckpt_gate_proj_name="gate_proj", + ckpt_down_proj_name="down_proj", + ckpt_up_proj_name="up_proj", + param_gate_up_proj_name="experts.up_gate_proj_", + param_down_proj_name="experts.down_proj_", + ) + params_dict = dict(self.named_parameters()) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + for loaded_weight_name, loaded_weight in weights_iterator: + for param_name, weight_name, shard_id in stacked_params_mapping: + if weight_name not in loaded_weight_name: + continue + if "mlp.experts" in loaded_weight_name: + continue + model_param_name = loaded_weight_name.replace(weight_name, param_name) + if model_param_name not in params_dict: + continue + param = params_dict[model_param_name] + weight_loader = getattr(param, "weight_loader", default_weight_loader(self.fd_config)) + weight_loader(param, loaded_weight, shard_id) + break + else: + for mapping in expert_params_mapping: + param_name, weight_name, expert_id, shard_id = mapping + if weight_name not in loaded_weight_name: + continue + model_param_name = loaded_weight_name.replace(weight_name, param_name) + if model_param_name not in params_dict: + continue + param = params_dict[model_param_name] + weight_loader = param.weight_loader + weight_loader(param, loaded_weight, shard_id=shard_id, expert_id=expert_id) + break + else: + model_param_name = loaded_weight_name + if model_param_name not in params_dict: + continue + param = params_dict[model_param_name] + weight_loader = getattr(param, "weight_loader", default_weight_loader(self.fd_config)) + weight_loader(param, loaded_weight) + + model_sublayer_name = re.sub(r"\.(up_gate_proj_weight|down_proj_weight|weight)$", "", model_param_name) + process_weights_after_loading_fn(model_sublayer_name, param) + + @paddle.no_grad() + def set_state_dict(self, state_dict): + """ + glm4_moe only support loader_v1. + """ + assert False, "glm4_moe only support --load_choices default_v1." + + def compute_logits(self, hidden_states: paddle.Tensor): + """ """ + logits = self.lm_head(hidden_states) + logits = logits.astype(paddle.float32) + logits[:, self.ori_vocab_size :] = -float("inf") + + return logits + + def forward( + self, + ids_remove_padding: paddle.Tensor, + forward_meta: ForwardMeta, + ): + """ """ + hidden_states = self.model(ids_remove_padding=ids_remove_padding, forward_meta=forward_meta) + + return hidden_states + + def clear_grpah_opt_backend(self): + """Clear graph optimization backend, the captured cuda graph will be cleaned""" + self.model.clear_grpah_opt_backend(fd_config=self.fd_config) + + +class Glm4MoePretrainedModel(PretrainedModel): + """ + Glm4MoePretrainedModel + """ + + config_class = FDConfig + + def _init_weight(self, layer): + """ + _init_weight + """ + return None + + @classmethod + def arch_name(self): + return "Glm4MoeForCausalLM" + + @classmethod + def _get_tensor_parallel_mappings(cls, config, is_split=True): + + logger.info("Glm4Moe inference model _get_tensor_parallel_mappings") + + from paddleformers.transformers.conversion_utils import split_or_merge_func + + fn = split_or_merge_func( + is_split=is_split, + tensor_parallel_degree=config.tensor_parallel_degree, + tensor_parallel_rank=config.tensor_parallel_rank, + num_attention_heads=config.num_attention_heads, + ) + + def get_tensor_parallel_split_mappings(num_layers): + final_actions = {} + + base_actions = { + "lm_head.weight": partial(fn, is_column=True), + "embed_tokens.weight": partial(fn, is_column=False), + "layers.0.self_attn.o_proj.weight": partial(fn, is_column=False), + } + + # Self Attention Layer which are need TP. + base_actions["layers.0.self_attn.q_proj.weight"] = partial(fn, is_column=True) + base_actions["layers.0.self_attn.k_proj.weight"] = partial(fn, is_column=True) + base_actions["layers.0.self_attn.v_proj.weight"] = partial(fn, is_column=True) + + # MLP Layer + base_actions["layers.0.mlp.gate_proj.weight"] = partial(fn, is_column=True) + base_actions["layers.0.mlp.up_proj.weight"] = partial(fn, is_column=True) + base_actions["layers.0.mlp.down_proj.weight"] = partial(fn, is_column=False) + + # Moe Layer + for expert_idx in range(config.n_routed_experts): + base_actions[f"layers.0.mlp.experts.{expert_idx}.up_proj.weight"] = partial(fn, is_column=True) + base_actions[f"layers.0.mlp.experts.{expert_idx}.gate_proj.weight"] = partial(fn, is_column=True) + base_actions[f"layers.0.mlp.experts.{expert_idx}.down_proj.weight"] = partial(fn, is_column=False) + + # Shared Expert Layer + base_actions["layers.0.mlp.shared_experts.up_proj.weight"] = partial(fn, is_column=True) + base_actions["layers.0.mlp.shared_experts.gate_proj.weight"] = partial(fn, is_column=True) + base_actions["layers.0.mlp.shared_experts.down_proj.weight"] = partial(fn, is_column=False) + + # MTP parts + base_actions["layers.46.embed_tokens.weight"] = partial(fn, is_column=False) + base_actions["layers.46.eh_proj.weight"] = partial(fn, is_column=True) + base_actions["layers.46.shared_head.head.weight"] = partial(fn, is_column=True) + + for key, action in base_actions.items(): + if "layers.0." in key: + for i in range(num_layers): + final_actions[key.replace("layers.0.", f"layers.{i}.")] = action + final_actions[key] = action + + return final_actions + + mappings = get_tensor_parallel_split_mappings(config.num_hidden_layers) + return mappings diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 09643a53f..8bb0239d5 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -857,6 +857,7 @@ class GPUModelRunner(ModelRunnerBase): position_ids=tmp_position_ids, base=self.model_config.rope_theta, model_config=self.model_config, + partial_rotary_factor=self.model_config.partial_rotary_factor, ) # Set block tables diff --git a/tests/e2e/test_fake_Glm45_AIR_serving.py b/tests/e2e/test_fake_Glm45_AIR_serving.py new file mode 100644 index 000000000..76ec5f98a --- /dev/null +++ b/tests/e2e/test_fake_Glm45_AIR_serving.py @@ -0,0 +1,216 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import shutil +import signal +import socket +import subprocess +import sys +import time + +import pytest +import requests + +# Read ports from environment variables; use default values if not set +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) +FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] + + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + + +def kill_process_on_port(port: int): + """ + Kill processes that are listening on the given port. + Uses `lsof` to find process ids and sends SIGKILL. + """ + try: + output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + current_pid = os.getpid() + parent_pid = os.getppid() + for pid in output.splitlines(): + pid = int(pid) + if pid in (current_pid, parent_pid): + print(f"Skip killing current process (pid={pid}) on port {port}") + continue + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except subprocess.CalledProcessError: + pass + + +def clean_ports(): + """ + Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + """ + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + time.sleep(2) + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports() + print("log dir clean ") + if os.path.exists("log") and os.path.isdir("log"): + shutil.rmtree("log") + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "GLM-4.5-Air-Fake") + else: + model_path = "./GLM-4.5-Air-Fake" + + log_path = "server.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "1", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "32768", + "--max-num-seqs", + "32", + "--graph-optimization-config", + '{"use_cudagraph":true}', + "--load_choices", + "default_v1", + "--lm_head-fp32", + ] + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print(f"API server is up on port {FD_API_PORT}") + break + time.sleep(1) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print(f"API server (pid={process.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for consistency testing, + including a fixed random seed and temperature. + """ + return { + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "temperature": 0.6, + "top_p": 0, # fix top_p to reduce randomness + "seed": 13, # fixed random seed + "max_tokens": 3, + "stream": False, + } + + +# ========================== +# Test for lm_head_fp32 with fixed payload +# ========================== +def test_lm_head_fp32(api_url, headers, consistent_payload): + """ + Test that two runs with the same fixed input produce similar outputs. + """ + # First request + response = requests.post(api_url, headers=headers, json=consistent_payload, timeout=300) + assert response.status_code == 200 + print(json.dumps(response.json(), indent=2, ensure_ascii=False)) + resp_json = response.json() + + # 校验返回内容与概率信息 + assert resp_json["choices"][0]["message"]["content"] == "ichertsor"