Files
FastDeploy/fastdeploy/worker/vl_gpu_model_runner.py
2025-06-29 23:29:37 +00:00

1205 lines
50 KiB
Python

"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import json
import os
import random
import numpy as np
import paddle
import paddle.distributed.fleet as fleet
from safetensors import safe_open
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
from fastdeploy.input.mm_processor import DataProcessor
from fastdeploy.model_executor.layers.attention import get_attention_backend
from fastdeploy.model_executor.layers.rotary_embedding import get_rope_3d
from fastdeploy.model_executor.layers.sample.meta_data import SamplingMetadata
from fastdeploy.model_executor.layers.sample.sampler import Sampler
from fastdeploy.model_executor.models.ernie4_5_moe import \
Ernie4_5_PretrainedModel
from fastdeploy.model_executor.models.ernie4_5_vl.configuration import \
Ernie4_5_VLMoeConfig
from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope import \
DFNRopeVisionTransformerConfig
from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope.modeling import \
DFNRopeVisionTransformerPretrainedModel
from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import (
ScatterOp, VariableResolutionResamplerModel)
from fastdeploy.model_executor.models.utils import load_checkpoint
from fastdeploy.platforms import current_platform
from fastdeploy.worker.forward_meta import ForwardMeta
from fastdeploy.worker.utils import check_safetensors_model
from fastdeploy.worker.vl_model_runner_base import VLModelRunnerBase
if current_platform.is_cuda() and current_platform.available():
from fastdeploy.model_executor.layers.utils import (
remove_padding, speculate_remove_padding)
from fastdeploy.model_executor.ops.gpu import (save_output,
set_stop_value_multi_ends,
set_value_by_flags_and_idx,
update_inputs)
class GPUVLModelRunner(VLModelRunnerBase):
def __init__(self, config, args, nranks, rank):
self.nranks = nranks
self.rank = rank
hcg = fleet.get_hybrid_communicate_group()
self.tensor_parallel_degree = max(hcg.get_model_parallel_world_size(),
1)
self.tensor_parallel_rank = hcg.get_model_parallel_rank()
self.mp_src_rank = hcg.get_model_parallel_group_src_rank()
self.mp_group = hcg.get_model_parallel_group()
self.is_safetensors_model = check_safetensors_model(
args.model_name_or_path)
model_path = os.path.dirname(args.model_name_or_path)
args.llm_model_name_or_path = args.model_name_or_path
if not self.is_safetensors_model:
args.tokenizer = args.image_preprocessor = model_path
else:
args.tokenizer = args.image_preprocessor = args.model_name_or_path
args.vision_model_name_or_path = os.path.join(
model_path, "DFNRopeVisionTransformer")
self.amp_black = [
"reduce_sum",
"c_softmax_with_cross_entropy",
"elementwise_div",
"sin",
"cos",
"sort",
"multinomial",
]
self.amp_white = [
"lookup_table",
"lookup_table_v2",
"flash_attn",
"matmul",
"matmul_v2",
"fused_gemm_epilogue",
]
super().__init__(config, args)
self.init_extra_input(config, args)
self._reset_paddle_env()
self.sampler = Sampler()
def _reset_paddle_env(self):
#FLAGS_gqa_use_tensorcore
#FLAGS_ffn2_use_hardamard
# gqa .etc paddle Flags set
pass
def update_chunked_prefill(self, tasks):
"""
更新chunked prefill相关参数
"""
if not self.args.enable_chunked_prefill:
return
for task in tasks:
if task.chunk_idx > len(task.prefill_chunk_info):
continue
idx = task.idx
if task.chunk_idx == len(task.prefill_chunk_info):
self.share_inputs["seq_lens_this_time"][idx:idx + 1] = 1
self.share_inputs['seq_lens_encoder'][idx:idx + 1] = 0
self.share_inputs["seq_lens_decoder"][idx:idx +
1] = task.start_idx
self.share_inputs["step_idx"][idx:idx + 1] = 1
else:
inputs = self._preprocess_task(
task.prefill_chunk_info[task.chunk_idx])
if inputs.get("images") is not None:
self.share_inputs[
"image_features"] = self.extract_vision_features(
inputs)
else:
# 兼容没有图片和视频的情况
self.share_inputs["image_features"] = None
token_chunk_size = inputs["input_ids"].shape[1]
self.share_inputs["input_ids"][
idx:idx + 1, :token_chunk_size] = inputs["input_ids"]
self.share_inputs["seq_lens_this_time"][idx:idx +
1] = token_chunk_size
self.share_inputs['seq_lens_encoder'][idx:idx +
1] = token_chunk_size
self.share_inputs["seq_lens_decoder"][idx:idx +
1] = task.start_idx
self.share_inputs["step_idx"][idx:idx + 1] = 0
task.start_idx += token_chunk_size
task.chunk_idx += 1
def _load_model(self, model_name, dynamic_load_weight):
vocab_file_names = [
"tokenizer.model", "spm.model", "ernie_token_100k.model"
]
for i in range(len(vocab_file_names)):
if os.path.exists(
os.path.join(self.args.tokenizer, vocab_file_names[i])):
ErnieBotTokenizer.resource_files_names[
"vocab_file"] = vocab_file_names[i]
break
tokenizer = ErnieBotTokenizer.from_pretrained(
self.args.tokenizer,
model_max_length=self.args.max_model_len,
padding_side="right",
use_fast=False,
)
tokenizer.ignored_index = -100
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.unk_token
config = Ernie4_5_VLMoeConfig.from_pretrained(
self.args.llm_model_name_or_path,
tensor_parallel_degree=self.tensor_parallel_degree,
tensor_parallel_rank=self.tensor_parallel_rank,
moe_group="dummy",
)
self.model_cfg = config
if self.is_safetensors_model:
meta_json = os.path.join(self.args.model_name_or_path,
"model.safetensors.index.json")
if os.path.exists(meta_json):
with open(
os.path.join(self.args.model_name_or_path,
"model.safetensors.index.json"),
"r") as f:
self.weight_map = json.load(f)["weight_map"]
else:
self.weight_map = {}
with safe_open(os.path.join(self.args.model_name_or_path,
"model.safetensors"),
framework="np") as f:
keys = f.keys()
for k in keys:
self.weight_map[k] = "model.safetensors"
if self.is_safetensors_model:
vision_config = config.vision_config
vision_config.tensor_parallel_degree = self.tensor_parallel_degree
vision_config.tensor_parallel_rank = self.tensor_parallel_rank
vision_config.attn_sep = False
vision_config.dtype = "bfloat16"
else:
vision_config = DFNRopeVisionTransformerConfig.from_pretrained(
self.args.vision_model_name_or_path,
tensor_parallel_degree=self.tensor_parallel_degree,
tensor_parallel_rank=self.tensor_parallel_rank,
attn_sep=False,
dtype="bfloat16",
)
config.vision_config = vision_config
self.vision_config = vision_config
config.pixel_hidden_size = config.vision_config.hidden_size
config.im_patch_id = tokenizer.get_vocab()["<|IMAGE_PLACEHOLDER|>"]
config.think_end_id = tokenizer.get_vocab()["</think>"]
config.max_text_id = config.im_patch_id
config.sequence_parallel = False
self.dtype = self.args.dtype
paddle.set_default_dtype(self.dtype)
self.vision_model, self.resampler_model = self.inject_pp_vision_model(
self.args, config)
processor = DataProcessor(
tokenizer_name=self.args.tokenizer,
image_preprocessor_name=str(self.args.image_preprocessor),
)
processor.eval()
image_preprocess = processor.image_preprocessor
image_preprocess.image_mean_tensor = paddle.to_tensor(
image_preprocess.image_mean, dtype="float32").reshape([1, 3, 1, 1])
image_preprocess.image_std_tensor = paddle.to_tensor(
image_preprocess.image_std, dtype="float32").reshape([1, 3, 1, 1])
image_preprocess.rescale_factor = paddle.to_tensor(
image_preprocess.rescale_factor, dtype="float32")
image_preprocess.image_mean_tensor = image_preprocess.image_mean_tensor.squeeze(
[-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1,
-1)
image_preprocess.image_std_tensor = image_preprocess.image_std_tensor.squeeze(
[-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1,
-1)
self.image_preprocess = image_preprocess
fd_config, self.model = build_stream_line_model(
self.args.model_name_or_path,
self.args.dtype,
self.args.block_size,
max_model_len=self.args.max_model_len,
tokenizer=tokenizer,
quantization=self.args.quantization,
)
self.model.eval()
self.set_state_dict(self.args)
fd_config.parallel_config.max_model_len = fd_config.model_config.max_seq_len
self.fd_config = fd_config
attn_backend_cls = get_attention_backend(self.args.attention_backend)
num_heads = self.fd_config.model_config.num_attention_heads // \
self.fd_config.parallel_config.tensor_parallel_degree
self.fd_config.model_config.kv_num_heads = int(
self.fd_config.model_config.num_key_value_heads
) // self.fd_config.parallel_config.tensor_parallel_degree
head_dim = self.fd_config.model_config.head_dim
self.attn_backend = attn_backend_cls(
self.fd_config,
kv_num_heads=self.fd_config.model_config.kv_num_heads,
num_heads=num_heads,
head_dim=head_dim)
self._init_kvcache()
def init_extra_input(self, config, args):
head_dim = self.model_cfg.head_dim
self.share_inputs.update({
"rope_emb":
paddle.full(shape=[
args.max_num_seqs, 2, 1, self.max_length, 1, head_dim // 2
],
fill_value=0,
dtype="float32")
})
self.share_inputs.update({"image_features": None})
self.share_inputs.update({
"need_think_end": paddle.full(shape=[
args.max_num_seqs, 1],
fill_value=0,
dtype="int32")
})
self.share_inputs.update({
"enable_thinking": paddle.full(shape=[1],
fill_value=True,
dtype="bool")
})
self.share_inputs.update({
"reasoning_index": paddle.full(shape=[
args.max_num_seqs, 1],
fill_value=0,
dtype="int32")
})
def init_rotary_position_embedding(self, max_model_len):
pass
def _init_kvcache(self):
"""
分享不拷贝数据
"""
cache_kvs = {}
total_block_num = self.num_gpu_blocks
num_layers = self.model_cfg.get("num_layers",
None) or self.model_cfg.get(
"num_hidden_layers", None)
kv_num_head = self.model_cfg.get(
"num_key_value_heads",
self.model_cfg.num_attention_heads,
)
kv_num_head = kv_num_head // self.tensor_parallel_degree
self.model_cfg.kv_num_head = kv_num_head
for i in range(num_layers):
cache_type = self.args.dtype
cache_kvs["key_caches_{}".format(i)] = paddle.full(
shape=[
total_block_num,
kv_num_head,
self.args.block_size,
self.model_cfg.head_dim,
],
fill_value=0,
dtype=cache_type,
)
cache_kvs["value_caches_{}".format(i)] = paddle.full(
shape=[
total_block_num,
kv_num_head,
self.args.block_size,
self.model_cfg.head_dim,
],
fill_value=0,
dtype=cache_type,
)
self.share_inputs["caches"] = list(cache_kvs.values())
for value in cache_kvs.values():
del value
paddle.device.cuda.empty_cache()
def clear_parameters(self, pid):
""" clear_parameters """
if "caches" in self.share_inputs:
self.model.clear_parameters(pid)
del self.share_inputs["caches"]
paddle.device.cuda.empty_cache()
self.model.log_memory_usage("clear all memory")
def update_parameters(self, pid):
""" update_parameters """
if "caches" not in self.share_inputs:
self.model.update_parameters(pid)
self._init_kvcache()
self.model.log_memory_usage("update all memory")
@paddle.no_grad()
def set_state_dict(self, args):
"""set_state_dict"""
if not self.is_safetensors_model:
rank_model_paths = []
for root, dirs, files in os.walk(self.args.llm_model_name_or_path):
for file in files:
if file == f"model_state.tp0{self.tensor_parallel_rank}.pdparams":
rank_model_paths.append(os.path.join(root, file))
elif file == "model_state.pdparams":
rank_model_paths.append(os.path.join(root, file))
state_dict = {}
for path in rank_model_paths:
loaded_dict = paddle.load(path, return_numpy=True)
state_dict.update(loaded_dict)
resampler_state = {}
for key in list(state_dict.keys()):
if "vision" in key:
state_dict.pop(key)
if key.startswith("ernie.resampler_model."):
value = state_dict.pop(key)
value = paddle.to_tensor(value).cast("bfloat16")
value = value.numpy()
resampler_state[
key[len("ernie.resampler_model."):]] = value
elif key.startswith("resampler_model."):
value = state_dict.pop(key)
value = paddle.to_tensor(value).cast("bfloat16")
value = value.numpy()
resampler_state[key[len("resampler_model."):]] = value
self.model.set_state_dict(state_dict)
self.resampler_model.set_state_dict(resampler_state)
else:
state_dict = load_checkpoint(
args.model_name_or_path,
Ernie4_5_PretrainedModel,
self.model_cfg,
return_numpy=True,
)
for key in list(state_dict.keys()):
if key.startswith("vision_model.") or key.startswith(
"ernie.resampler_model."):
state_dict.pop(key)
self.model.set_state_dict(state_dict)
@paddle.no_grad()
def vit_load(self, model_path, tensor_parallel_degree,
tensor_parallel_rank):
"""
vit_load tp参数
"""
if tensor_parallel_degree == 1:
rank_model_path = os.path.join(model_path, "model_state.pdparams")
else:
rank_model_path = os.path.join(
model_path, f"model_state_tp0{tensor_parallel_rank}.pdparams")
if os.path.exists(rank_model_path):
return paddle.load(rank_model_path, return_numpy=True)
else:
raise ValueError(f"No such a file {rank_model_path}")
@paddle.no_grad()
def inject_pp_vision_model(self, args, cfg):
"""
注入vision model参数
"""
def set_vision_state_dict(model,
tensor_parallel_degree=8,
tensor_parallel_rank=0,
name=""):
model_state_dict = model.state_dict()
compat_keys = [name + k for k in model_state_dict.keys()]
model_files = set()
for k in compat_keys:
if k in self.weight_map.keys():
model_files.add(
os.path.join(args.model_name_or_path,
self.weight_map[k]))
state_dict = {}
for model_file in model_files:
with safe_open(model_file, framework="np") as f:
for k in f.keys():
if k in compat_keys:
new_k = k.replace(name, "")
tensor = f.get_tensor(k)
if tensor_parallel_degree > 1:
if "resampler_model" in name and new_k == "spatial_linear.0.weight":
tensor = np.split(
tensor, tensor_parallel_degree,
axis=0)[tensor_parallel_rank]
elif name == "vision_model.":
if "attn.proj.weight" in new_k or "fc2.weight" in new_k:
tensor = np.split(
tensor,
tensor_parallel_degree,
axis=0)[tensor_parallel_rank]
elif "fc1.weight" in new_k or "fc1.bias" in new_k:
tensor = np.split(
tensor,
tensor_parallel_degree,
axis=-1)[tensor_parallel_rank]
elif "qkv.weight" in new_k:
head_dim = self.vision_config.hidden_size // self.vision_config.num_heads
tensor = tensor.reshape([
self.vision_config.hidden_size, 3,
self.vision_config.num_heads,
head_dim
])
tensor = np.split(
tensor,
tensor_parallel_degree,
axis=-2
)[tensor_parallel_rank].reshape([
self.vision_config.hidden_size, -1
])
elif "qkv.bias" in new_k:
head_dim = self.vision_config.hidden_size // self.vision_config.num_heads
tensor = tensor.reshape([
3, self.vision_config.num_heads,
head_dim
])
tensor = np.split(
tensor,
tensor_parallel_degree,
axis=-2
)[tensor_parallel_rank].reshape([-1])
state_dict[new_k] = tensor
model.set_state_dict(state_dict)
vision_model = DFNRopeVisionTransformerPretrainedModel(
cfg.vision_config)
vision_model = paddle.amp.decorate(models=vision_model,
level="O2",
dtype="bfloat16")
vision_model.eval()
if not self.is_safetensors_model:
vit_state_dict = self.vit_load(args.vision_model_name_or_path,
self.tensor_parallel_degree,
self.tensor_parallel_rank)
vision_model.set_state_dict(vit_state_dict)
else:
set_vision_state_dict(
vision_model,
tensor_parallel_degree=self.tensor_parallel_degree,
tensor_parallel_rank=self.tensor_parallel_rank,
name="vision_model.",
)
resampler_model = VariableResolutionResamplerModel(
cfg.pixel_hidden_size,
cfg.hidden_size,
cfg.spatial_conv_size,
cfg.temporal_conv_size,
config=cfg,
)
resampler_model = paddle.amp.decorate(models=resampler_model,
level="O2",
dtype="bfloat16")
resampler_model.eval()
if self.is_safetensors_model:
is_ernie_begin = False
for k in self.weight_map.keys():
if k.startswith("ernie.resampler_model."):
is_ernie_begin = True
set_vision_state_dict(
resampler_model,
tensor_parallel_degree=self.tensor_parallel_degree,
tensor_parallel_rank=self.tensor_parallel_rank,
name="ernie.resampler_model."
if is_ernie_begin else "resampler_model.",
)
return vision_model, resampler_model
@paddle.no_grad()
def extract_vision_features(self, inputs):
"""extract_vision_features"""
assert inputs["images"] is not None
grid_thw = inputs["grid_thw"]
images = inputs["images"].cast("float32")
images = self.image_preprocess.rescale_factor * images - self.image_preprocess.image_mean_tensor
images = images / self.image_preprocess.image_std_tensor
images = images.cast("bfloat16")
token_type_ids = inputs["token_type_ids"]
token_type_ids_w_video = token_type_ids
input_ids = inputs["input_ids"]
# convert to img patch id
image_mask = input_ids == self.model_cfg.im_patch_id
image_type_ids = inputs["image_type_ids"]
with paddle.amp.auto_cast(
True,
custom_black_list=self.amp_black,
custom_white_list=self.amp_white,
level="O2",
dtype=self.dtype,
):
image_features = self.vision_model.extract_feature(
images, grid_thw)
if self.tensor_parallel_degree > 1:
S, C = image_features.shape
image_features = image_features.reshape(
[-1, C * self.model_cfg.spatial_conv_size**2])
image_features = ScatterOp.apply(image_features,
axis=-1) # mp 切 Fea
image_features = image_features.reshape([S, -1])
image_features = self.resampler_model(
image_features,
image_mask,
token_type_ids_w_video,
image_type_ids,
grid_thw,
)
return image_features
@paddle.no_grad()
def prepare_rope3d(self, position_ids, **kwargs):
"""prepare_rope3d"""
prefix_max_position_ids = paddle.max(position_ids) + 1
dec_pos_ids = paddle.tile(
paddle.arange(kwargs["max_length"],
dtype="int64").unsqueeze(0).unsqueeze(-1), [1, 1, 3])
dec_pos_ids = dec_pos_ids + prefix_max_position_ids
position_ids_3d_real = paddle.concat([position_ids, dec_pos_ids],
axis=1)
rope_emb = get_rope_3d(
position_ids=position_ids_3d_real,
rotary_dim=self.model_cfg.head_dim,
paritial_rotary_factor=1.0,
base=self.model_cfg.rope_theta,
max_position=self.args.max_model_len,
freq_allocation=self.model_cfg.freq_allocation,
)
return rope_emb
def prefill_finished(self):
"""
判断是否已经完成了prefill操作
"""
prefill_statue = (self.share_inputs["seq_lens_this_time"] != 0) & (
self.share_inputs["seq_lens_this_time"] != 1)
return not paddle.any(prefill_statue).numpy()
def dy_input_preprocess(self, tasks):
"""
dynamic insertion
"""
def get_numeric_value(task, key, default_value):
if task.get(key, None) is not None:
return task.get(key)
else:
return default_value
for i in range(len(tasks)):
task = tasks[i]
idx = task.idx
kwargs = {
"max_length":
get_numeric_value(task, "max_tokens", 2048),
"top_p":
get_numeric_value(task, "top_p", 0.8),
"temperature":
get_numeric_value(task, "temperature", 0.2),
"top_k":
get_numeric_value(task, "top_k", 0),
"penalty_score":
get_numeric_value(task, "repetition_penalty", 1.0),
"frequency_score":
get_numeric_value(task, "frequency_penalty", 0.0),
"presence_score":
get_numeric_value(task, "presence_penalty", 0.0),
"decode_strategy":
"sampling",
"pad_token_id":
self.args.pad_token_id,
"enable_thinking":
get_numeric_value(task, "enable_thinking", True),
"reasoning_max_tokens":
get_numeric_value(task, "reasoning_max_tokens", 2048),
}
if self.args.enable_chunked_prefill:
task.set("chunk_idx", 1)
inputs = self._preprocess_task(task.prefill_chunk_info[0])
if inputs.get("images") is not None:
self.share_inputs[
"image_features"] = self.extract_vision_features(
inputs)
else:
# 兼容没有图片和视频的情况
self.share_inputs["image_features"] = None
if task.multimodal_inputs["position_ids"] is not None:
position_ids = paddle.to_tensor(
task.multimodal_inputs["position_ids"],
dtype="int64").unsqueeze([0])
else:
position_ids = None
token_chunk_size = inputs["input_ids"].shape[1]
task.set("start_idx", token_chunk_size)
self.share_inputs["input_ids"][
idx:idx + 1, :token_chunk_size] = inputs["input_ids"]
self.share_inputs["seq_lens_this_time"][idx:idx +
1] = token_chunk_size
self.share_inputs["seq_lens_encoder"][idx:idx +
1] = token_chunk_size
self.share_inputs["step_seq_lens_encoder"][
idx:idx + 1] = token_chunk_size
else:
inputs = self._preprocess_task(task.multimodal_inputs)
if inputs.get("images") is not None:
self.share_inputs[
"image_features"] = self.extract_vision_features(
inputs)
else:
# 兼容没有图片和视频的情况
self.share_inputs["image_features"] = None
position_ids = inputs["position_ids"]
length = inputs["input_ids"].shape[1]
self.share_inputs["input_ids"][
idx:idx + 1, :length] = inputs["input_ids"]
self.share_inputs["seq_lens_this_time"][idx:idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx:idx + 1] = length
self.share_inputs["step_seq_lens_encoder"][idx:idx +
1] = length
# force </think>
self.share_inputs["enable_thinking"][:] = kwargs["enable_thinking"]
self.share_inputs["need_think_end"][idx:idx +
1, :] = 1 if kwargs["enable_thinking"] else 0
self.share_inputs["reasoning_index"][idx:idx + 1, :] = kwargs["reasoning_max_tokens"]
self.share_inputs["rope_emb"][idx:idx +
1, :] = self.prepare_rope3d(
position_ids, **kwargs)
self.share_inputs["top_p"][idx:idx + 1] = kwargs["top_p"]
self.share_inputs["temperature"][idx:idx +
1] = kwargs["temperature"]
self.share_inputs["eos_token_id"][:] = np.array(
task.eos_token_ids).astype("int64").reshape(-1, 1)
self.share_inputs["penalty_score"][idx:idx +
1] = kwargs["penalty_score"]
self.share_inputs["frequency_score"][idx:idx +
1] = kwargs["frequency_score"]
self.share_inputs["presence_score"][idx:idx +
1] = kwargs["presence_score"]
self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0
self.share_inputs["step_idx"][idx:idx + 1] = 0
self.share_inputs["min_dec_len"][idx:idx + 1] = 1
self.share_inputs["max_dec_len"][idx:idx +
1] = kwargs["max_length"]
self.share_inputs["stop_flags"][idx:idx + 1] = False
self.share_inputs["pre_ids"][idx:idx + 1] = -1
encoder_block_num = len(task.get("block_tables"))
self.share_inputs["encoder_block_lens"][idx:idx +
1] = encoder_block_num
self.share_inputs["block_tables"][idx:idx + 1, :] = -1
self.share_inputs["block_tables"][
idx:idx + 1, :encoder_block_num] = np.array(task.block_tables,
dtype="int32")
def pre_process(self):
"""
pre_process
"""
if current_platform.is_cuda():
if self.args.speculative_method is not None:
(
ids_remove_padding,
padding_offset,
cum_offsets,
cu_seqlens_q,
cu_seqlens_k,
) = speculate_remove_padding(
max_len=self.args.max_model_len,
input_ids=self.share_inputs["input_ids"],
seq_lens_this_time=self.share_inputs["seq_lens_this_time"],
draft_tokens=self.share_inputs["draft_tokens"],
seq_lens_encoder=self.share_inputs["seq_lens_encoder"])
else:
(
ids_remove_padding,
padding_offset,
cum_offsets,
cu_seqlens_q,
cu_seqlens_k,
) = remove_padding(
max_len=self.args.max_model_len,
input_ids=self.share_inputs["input_ids"],
seq_lens_this_time=self.share_inputs["seq_lens_this_time"])
self.share_inputs["ids_remove_padding"] = ids_remove_padding
self.share_inputs["padding_offset"] = padding_offset
self.share_inputs["cum_offsets"] = cum_offsets
self.share_inputs["cu_seqlens_q"] = cu_seqlens_q
self.share_inputs["cu_seqlens_k"] = cu_seqlens_k
self.share_inputs["decoder_batch_ids"] = paddle.full(
[self.fd_config.parallel_config.max_num_seqs, 1], 0, dtype='int32')
self.share_inputs["decoder_tile_ids_per_batch"] = paddle.full(
[self.fd_config.parallel_config.max_num_seqs, 1], 0, dtype='int32')
# initialize_forward_meta
self.forward_meta = ForwardMeta.init_forward_meta(
self.share_inputs, self.attn_backend)
self.attn_backend.init_attention_metadata(self.forward_meta)
self.sampling_metadata = SamplingMetadata(
temperature=self.share_inputs["temperature"],
top_p=self.share_inputs["top_p"],
step_idx=self.share_inputs["step_idx"],
pre_token_ids=self.share_inputs["pre_ids"],
frequency_penalties=self.share_inputs["frequency_score"],
presence_penalties=self.share_inputs["presence_score"],
repetition_penalties=self.share_inputs["penalty_score"],
min_dec_lens=self.share_inputs["min_dec_len"],
bad_words_token_ids=self.share_inputs["bad_tokens"],
eos_token_ids=self.share_inputs["eos_token_id"],
)
def generate(self):
self.pre_process()
hiddden_states = self.model(self.share_inputs["ids_remove_padding"],
self.share_inputs["image_features"],
self.forward_meta)
logits = self.model.compute_logits(hiddden_states)
set_value_by_flags_and_idx(
self.share_inputs["pre_ids"],
self.share_inputs["input_ids"],
self.share_inputs["seq_lens_this_time"],
self.share_inputs["seq_lens_encoder"],
self.share_inputs["seq_lens_decoder"],
self.share_inputs["step_idx"],
self.share_inputs["stop_flags"],
)
# sampler & save_output
next_tokens = self.sampler(logits, self.sampling_metadata)
if self.fd_config.parallel_config.tensor_parallel_degree > 1:
paddle.distributed.broadcast(next_tokens, 0)
self.post_process(next_tokens)
def post_process(self, next_tokens):
if self.share_inputs["enable_thinking"]:
exists_think_end = next_tokens == self.model_cfg.think_end_id
paddle.assign(
paddle.where(
exists_think_end,
self.share_inputs["need_think_end"] - 1,
self.share_inputs["need_think_end"],
),
self.share_inputs["need_think_end"]
)
paddle.assign(
paddle.where(
self.share_inputs["need_think_end"].cast("bool"),
self.share_inputs["reasoning_index"] - 1,
self.share_inputs["reasoning_index"],
),
self.share_inputs["reasoning_index"]
)
stop_wo_think = (
(
next_tokens == self.share_inputs["eos_token_id"]
) | (
self.share_inputs["reasoning_index"] == 0
)
) & (
self.share_inputs["need_think_end"] > 0
)
next_tokens = paddle.where(stop_wo_think, self.model_cfg.think_end_id, next_tokens)
paddle.assign(
paddle.where(
stop_wo_think,
self.share_inputs["need_think_end"] - 1,
self.share_inputs["need_think_end"],
),
self.share_inputs["need_think_end"]
)
paddle.assign(
paddle.where(
self.share_inputs["stop_flags"],
self.share_inputs["step_idx"],
self.share_inputs["step_idx"] + 1,
),
self.share_inputs["step_idx"],
)
length_cond = paddle.greater_equal(self.share_inputs["step_idx"],
self.share_inputs["max_dec_len"])
paddle.assign(
paddle.logical_or(self.share_inputs["stop_flags"], length_cond),
self.share_inputs["stop_flags"],
)
set_stop_value_multi_ends(
next_tokens,
self.share_inputs["stop_flags"],
self.share_inputs["seq_lens_this_time"],
self.share_inputs["eos_token_id"],
self.share_inputs["next_tokens"],
False,
) # multi ends
# update inputs
with paddle.framework._no_check_dy2st_diff():
update_inputs(
self.share_inputs["stop_flags"],
self.share_inputs["not_need_stop"],
self.share_inputs["seq_lens_this_time"],
self.share_inputs["seq_lens_encoder"],
self.share_inputs["seq_lens_decoder"],
self.share_inputs["input_ids"],
self.share_inputs["stop_nums"],
next_tokens,
self.share_inputs["is_block_step"],
)
save_output(
next_tokens,
self.share_inputs["not_need_stop"],
self.rank,
False, # use_ep
)
def _cal_theortical_kvcache(self):
"""
计算理论的kvcache大小
"""
num_layers = self.model_cfg.get("num_layers",
None) or self.model_cfg.get(
"num_hidden_layers", None)
byte_of_cache = 2
#TODO
# 支持c8 c4
hidden_dim = self.model_cfg.head_dim * self.model_cfg.kv_num_head
theoretical_kv_cache_memory = (2 * byte_of_cache *
self.args.block_size * num_layers *
hidden_dim)
return theoretical_kv_cache_memory
def _update_share_input_block_num(self):
num_gpu_blocks = self.num_gpu_blocks
del self.share_inputs["caches"]
self._init_kvcache()
del self.share_inputs["block_tables"]
self.share_inputs["block_tables"] = paddle.full(
[self.args.max_num_seqs, num_gpu_blocks], -1, dtype="int32")
# 初始化free list
free_list = list(
range(num_gpu_blocks - 1,
int(num_gpu_blocks * self.args.kv_cache_ratio) - 1, -1))
self.free_list_len = len(free_list)
self.share_inputs.update({
"free_list":
paddle.to_tensor(free_list, dtype="int32"),
"free_list_len":
paddle.full([1], self.free_list_len, dtype="int32"),
})
def dummy_input(self, num_total_tokens, number_of_tasks):
"""
fake input to profile
"""
input_length = min(num_total_tokens // number_of_tasks,
self.args.max_model_len - 10)
block_num = (input_length + self.args.block_size - 1 ) // self.args.block_size \
+ self.args.enc_dec_block_num
self.share_inputs["free_list"] = paddle.to_tensor([], dtype="int32")
self.share_inputs["free_list_len"][0] = 0
for i in range(number_of_tasks):
idx = i
self.share_inputs["input_ids"][idx:idx +
1, :input_length] = np.array(
[5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array(
[2], dtype="int64").reshape(-1, 1)
self.share_inputs["seq_lens_this_time"][idx:idx + 1] = input_length
self.share_inputs["step_seq_lens_encoder"][idx:idx +
1] = input_length
self.share_inputs["seq_lens_encoder"][idx:idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0
self.share_inputs["step_idx"][idx:idx + 1] = 0
self.share_inputs["max_dec_len"][idx:idx + 1] = 10
self.share_inputs["stop_flags"][idx:idx + 1] = False
self.share_inputs["first_token_ids"][
idx:idx + 1] = self.share_inputs["input_ids"][idx:idx + 1, :1]
self.share_inputs["ori_seq_lens_encoder"][idx:idx +
1] = input_length
self.share_inputs["infer_seed"][idx:idx + 1] = random.randint(
0, 922337203685477580)
self.share_inputs["encoder_block_lens"][idx:idx + 1] = block_num
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(idx * block_num, \
(idx + 1) * block_num, 1)
def _preprocess_task(self, one):
"""process batch"""
input_ids = one["input_ids"][np.newaxis, :]
input_ids = paddle.to_tensor(input_ids, dtype=paddle.int64)
token_type_ids = one["token_type_ids"][np.newaxis, :]
token_type_ids = paddle.to_tensor(token_type_ids, dtype=paddle.int64)
if one["images"] is not None:
image_type_ids = one["image_type_ids"][np.newaxis, :]
images = one["images"]
image_type_ids = paddle.to_tensor(image_type_ids,
dtype=paddle.int64)
images = paddle.to_tensor(images, dtype="uint8")
grid_thw = paddle.to_tensor(one["grid_thw"], dtype="int64")
else:
image_type_ids = None
images = None
grid_thw = None
if one["position_ids"] is not None:
position_ids = paddle.to_tensor(one["position_ids"],
dtype="int64").unsqueeze([0])
else:
position_ids = None
result = dict(
input_ids=input_ids,
image_type_ids=image_type_ids,
token_type_ids=token_type_ids,
position_ids=position_ids,
grid_thw=grid_thw,
images=images,
)
return result
def build_stream_line_model(
model_path,
dtype,
block_size,
max_model_len,
tokenizer,
quantization: str = "None",
):
"""
build model
"""
import contextlib
from paddleformers.transformers.configuration_utils import PretrainedConfig
from paddleformers.trl import llm_utils
from paddleformers.utils.log import logger
from fastdeploy.config import (DeviceConfig, FDConfig, KVCacheConfig,
LoadConfig, ModelConfig, MoEConfig,
MoEPhase, ParallelConfig, SpeculativeConfig)
from fastdeploy.model_executor.layers.quantization import \
get_quantization_config
from fastdeploy.model_executor.models.model_base import ModelRegistry
config, _ = PretrainedConfig.get_config_dict(model_path)
config["head_dim"] = config.get(
"head_dim", config["hidden_size"] // config["num_attention_heads"])
config["rope_theta"] = config.get("rope_theta", 10000.0)
rope_theta = config["rope_theta"]
model_config = ModelConfig.from_dict(config)
model_config.head_dim = config["head_dim"]
parallel_config = ParallelConfig()
speculative_config = SpeculativeConfig()
device_config = DeviceConfig()
load_config = LoadConfig()
moe_config = MoEConfig()
kv_cache_config = KVCacheConfig()
kv_cache_config.cache_quant_dtype = "none"
tensor_parallel_rank, tensor_parallel_degree = llm_utils.init_dist_env()
parallel_config.tensor_parallel_rank = tensor_parallel_rank
parallel_config.tensor_parallel_degree = tensor_parallel_degree
parallel_config.tensor_parallel_degree = tensor_parallel_degree
parallel_config.expert_parallel_degree = 1
parallel_config.expert_parallel_rank = int(tensor_parallel_rank /
tensor_parallel_degree)
parallel_config.column_cut = False
speculative_config.is_mtp = False
speculative_config.draft_type = "None"
# Note(tangbinhan): used for load_checkpoint
model_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank
model_config.tensor_parallel_degree = parallel_config.tensor_parallel_degree
model_config.is_mtp = speculative_config.is_mtp
moe_config.num_experts = None
# use the length of tokenizer as the origin vocab size
ori_vocab_size = len(tokenizer)
moe_intermediate_size = (config.get("moe_intermediate_size", None), )
if isinstance(moe_intermediate_size, list) or isinstance(
moe_intermediate_size, tuple):
moe_intermediate_size = moe_intermediate_size[0]
num_key_value_heads = config.get("num_key_value_heads", -1)
if num_key_value_heads is None:
num_key_value_heads = -1
# RL need, some model num_key_value_heads less tensor_parallel_degree, need copy
if num_key_value_heads < tensor_parallel_degree:
logger.warning(
f"key value heads num is {num_key_value_heads}, tensor parallel degree is {tensor_parallel_degree}"
)
num_key_value_heads = tensor_parallel_degree
if config.get("ffn_hidden_size", None) is not None:
ffn_hidden_size = config["ffn_hidden_size"]
elif config.get("intermediate_size", None) is not None:
ffn_hidden_size = config["intermediate_size"]
else:
ffn_hidden_size = 4 * config["hidden_size"]
if config["hidden_act"].lower() == "swiglu":
if paddle.distributed.get_world_size() > 1:
multiple_of = 8 * config["num_attention_heads"]
else:
multiple_of = 4 * config["num_attention_heads"]
ffn_hidden_size = multiple_of * (
(int(2 * ffn_hidden_size / 3) + multiple_of - 1) //
multiple_of)
num_layers = config.get("num_layers", None) or config.get(
"num_hidden_layers", None)
if num_layers is None:
raise ValueError(f"num_layers<{num_layers}> is invalid")
remove_tail_layer = config.get("remove_tail_layer")
if remove_tail_layer is True:
num_layers -= 1
elif isinstance(remove_tail_layer, int):
num_layers -= remove_tail_layer
moe_num_experts = config.get("moe_num_experts", 0)
if isinstance(moe_num_experts, list):
moe_num_experts = max(moe_num_experts)
use_moe = moe_num_experts > 0
context = contextlib.nullcontext()
if config["hidden_act"].lower() == "swiglu":
model_config.hidden_act = "swiglu"
model_config.ffn_hidden_size = ffn_hidden_size
model_config.max_seq_len = max_model_len
model_config.num_layers = num_layers
model_config.dtype = dtype
parallel_config.block_size = block_size
parallel_config.msg_queue_id = None
model_config.num_key_value_heads = num_key_value_heads
model_config.return_all_hidden_states = False
speculative_config.draft_type = "None"
model_config.start_layer_index = 0
if use_moe:
moe_config.num_experts = config.get("moe_num_experts", None)
moe_config.moe_intermediate_size = config.get("moe_intermediate_size",
None)
moe_config.top_k = config.get("moe_topk", 8)
moe_config.moe_num_shared_experts = config.get(
"moe_num_shared_experts", 0)
moe_config.moe_layer_start_index = config.get("moe_layer_start_index",
None)
moe_config.moe_layer_end_index = config.get("moe_layer_end_index",
None)
model_config.moe_phase = MoEPhase.PREFILL
model_config.ori_vocab_size = ori_vocab_size
quantization_config = config.get("quantization_config", None)
quant_config_name = None
if quantization_config is not None and quantization_config.get(
"quantization", None) is None:
raise ValueError(
"quantization_config should have a key named 'quantization' for specify quant config."
)
if quantization_config is not None:
quant_config_name = quantization_config["quantization"]
quant_cls = get_quantization_config(quant_config_name)
quant_config = quant_cls.from_config(quantization_config)
elif quantization != "None":
quantization_config = {}
if use_moe and quantization == "wint4":
quantization_config["dense_quant_type"] = "wint8"
quantization_config["moe_quant_type"] = "wint4"
quant_config_name = "mix_quant"
else:
quant_config_name = quantization
quant_cls = get_quantization_config(quant_config_name)
quant_config = quant_cls.from_config(quantization_config)
else:
quant_config = None
logger.info("===========quantization_config==============")
if quant_config is not None:
logger.info(f"{quantization_config}")
else:
logger.info(
"No quantization config found and use original weight and act dtype."
)
logger.info("============================================")
fd_config = FDConfig(
model_config=model_config,
parallel_config=parallel_config,
speculative_config=speculative_config,
device_config=device_config,
load_config=load_config,
moe_config=moe_config,
quant_config=quant_config,
kv_cache_config=kv_cache_config,
)
fd_config.parallel_config.max_model_len = max_model_len
fd_config.model_config.rope_theta = rope_theta
with context:
model_cls = ModelRegistry.get_class(model_config.architectures[0])
model = model_cls(fd_config)
model.eval()
return fd_config, model