From bad53c6b6ee041218fa44a3514d54027bd33eeec Mon Sep 17 00:00:00 2001 From: bukejiyu <52310069+bukejiyu@users.noreply.github.com> Date: Sun, 13 Jul 2025 07:36:26 +0800 Subject: [PATCH] [vl]remove duplicated load logic (#2744) --- fastdeploy/config.py | 26 +- fastdeploy/model_executor/model_loader.py | 5 +- .../model_executor/models/ernie4_5_moe.py | 30 +- .../models/ernie4_5_vl/dfnrope/modeling.py | 28 +- .../models/ernie4_5_vl/ernie4_5_vl_moe.py | 250 +++++++- .../models/ernie4_5_vl/modeling_resampler.py | 22 +- fastdeploy/model_executor/models/tp_utils.py | 86 ++- fastdeploy/model_executor/models/utils.py | 30 +- fastdeploy/rl/rollout_config.py | 2 +- fastdeploy/worker/vl_gpu_model_runner.py | 540 ++---------------- fastdeploy/worker/worker_process.py | 123 ++-- 11 files changed, 510 insertions(+), 632 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 3d7b0caad..2f4f4b19d 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -18,7 +18,7 @@ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum -from typing import Literal, Optional +from typing import Literal, Optional, Union from paddleformers.transformers.configuration_utils import PretrainedConfig @@ -72,8 +72,10 @@ class ModelConfig(PretrainedConfig): rope_theta: int = 10000, rope_3d: bool = False, ori_vocab_size: int | None = None, - moe_layer_start_index: int | None = None, - moe_layer_end_index: int | None = None, + moe_layer_start_index: Union[int, list[int], None] = None, + moe_num_experts: Union[int, list[int], None] = None, + moe_layer_end_index: Union[int, list[int], None] = None, + moe_num_shared_experts: int | None = None, num_hidden_layers: int | None = None, prefix_name="", freeze_embedding=False, @@ -110,14 +112,10 @@ class ModelConfig(PretrainedConfig): self.prefix_name = prefix_name self.freeze_embedding = freeze_embedding self.rope_head_dim = rope_head_dim - moe_num_experts = kwargs.get("moe_num_experts", 0) - if moe_layer_start_index is not None: - self.moe_layer_start_index = moe_layer_start_index - elif moe_num_experts == 0: - self.moe_layer_start_index = self.num_layers - self.moe_num_experts = 0 - if moe_layer_end_index is not None: - self.moe_layer_end_index = moe_layer_end_index + self.moe_layer_start_index = moe_layer_start_index + self.moe_num_experts = moe_num_experts + self.moe_num_shared_experts = moe_num_shared_experts + self.moe_layer_end_index = moe_layer_end_index self.ffn_hidden_size = ffn_hidden_size self.rope_3d = rope_3d self.start_layer_index = start_layer_index @@ -132,15 +130,15 @@ class MoEConfig: """ Configuration for MoE. """ - num_experts: int = -1 + num_experts: Union[int, list[int], None] = None top_k: int = 8 moe_intermediate_size: int = -1 num_experts_per_rank: int = -1 num_experts_start_offset: int = -1 moe_num_shared_experts = (0, ) - moe_layer_start_index = 0 - moe_layer_end_index = None + moe_layer_start_index: Union[int, list[int], None] = None + moe_layer_end_index: Union[int, list[int], None] = None moe_use_aux_free: bool = False num_max_dispatch_tokens_per_rank = 256 im_patch_id = ( diff --git a/fastdeploy/model_executor/model_loader.py b/fastdeploy/model_executor/model_loader.py index cb037c2c6..8d1819840 100644 --- a/fastdeploy/model_executor/model_loader.py +++ b/fastdeploy/model_executor/model_loader.py @@ -28,6 +28,8 @@ from fastdeploy.model_executor.models.ernie4_5_moe import \ Ernie4_5_PretrainedModel from fastdeploy.model_executor.models.ernie4_5_mtp import \ Ernie4_5_MTPPretrainedModel +from fastdeploy.model_executor.models.ernie4_5_vl.ernie4_5_vl_moe import \ + Ernie4_5_VLPretrainedModel from fastdeploy.model_executor.models.model_base import ModelRegistry from fastdeploy.model_executor.models.qwen2 import Qwen2PretrainedModel from fastdeploy.model_executor.models.qwen3 import Qwen3PretrainedModel @@ -42,6 +44,7 @@ MODEL_CLASSES = { "Qwen3MoeForCausalLM": Qwen3MoePretrainedModel, "Ernie4_5_ForCausalLM": Ernie4_5_PretrainedModel, "DeepseekV3ForCausalLM": DeepSeekV3PretrainedModel, + "Ernie4_5_VLMoeForConditionalGeneration": Ernie4_5_VLPretrainedModel, } @@ -94,7 +97,7 @@ class DefaultModelLoader(BaseModelLoader): if fd_config.load_config.dynamic_load_weight: # register rl model - import fastdeploy.rl + import fastdeploy.rl # noqa architectures = architectures + "RL" with context: diff --git a/fastdeploy/model_executor/models/ernie4_5_moe.py b/fastdeploy/model_executor/models/ernie4_5_moe.py index f36adc20e..d1fef50d0 100644 --- a/fastdeploy/model_executor/models/ernie4_5_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_moe.py @@ -26,6 +26,7 @@ from paddleformers.transformers import PretrainedModel from paddleformers.utils.log import logger from fastdeploy.config import FDConfig, ModelConfig +from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.model_executor.graph_optimization.decorator import \ support_graph_optimization from fastdeploy.model_executor.layers.activation import SiluAndMul @@ -41,7 +42,6 @@ from fastdeploy.model_executor.models.tp_utils import TensorSplitMode as tsm from fastdeploy.model_executor.models.utils import \ LayerIdPlaceholder as layerid from fastdeploy.model_executor.models.utils import WeightMeta -from fastdeploy.model_executor.forward_meta import ForwardMeta class Ernie4_5_MLP(nn.Layer): @@ -599,27 +599,13 @@ class Ernie4_5_PretrainedModel(PretrainedModel): start_layer = (moe_layer_start_index if moe_layer_start_index > 0 else num_layers) final_actions = build_expanded_keys( - num_layers, - moe_num_experts, - start_layer, - base_actions, + base_actions, num_layers, start_layer, moe_num_experts ) return final_actions - - moe_num_experts = 0 - if isinstance(config.moe_num_experts, list): - moe_num_experts = sum(config.moe_num_experts) - elif isinstance(config.moe_num_experts, int): - moe_num_experts = config.moe_num_experts - - moe_layer_start_index = -1 - if isinstance(config.moe_layer_start_index, list): - moe_layer_start_index = min(config.moe_layer_start_index) - elif isinstance(config.moe_layer_start_index, int): - moe_layer_start_index = config.moe_layer_start_index - - mappings = get_tensor_parallel_split_mappings(config.num_layers, - moe_num_experts, - moe_layer_start_index, - config.prefix_name) + mappings = get_tensor_parallel_split_mappings( + config.num_layers, + config.moe_num_experts, + config.moe_layer_start_index, + config.prefix_name, + ) return mappings diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py index d43cc5bf3..c8e8ef2b0 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py @@ -29,6 +29,8 @@ from paddle.nn.functional.flash_attention import \ flash_attn_unpadded as flash_attn_varlen_func from paddleformers.transformers.model_utils import PretrainedModel +from fastdeploy.model_executor.layers.utils import get_tensor + from .activation import ACT2FN from .configuration import DFNRopeVisionTransformerConfig @@ -487,10 +489,10 @@ class DFNRopeVisionTransformerPretrainedModel(PretrainedModel): config_class = DFNRopeVisionTransformerConfig - def __init__(self, config) -> None: + def __init__(self, config, prefix_name: str = "") -> None: super().__init__(config) self.spatial_merge_size = config.spatial_merge_size - + self.prefix_name = prefix_name self.patch_embed = PatchEmbed( patch_size=config.patch_size, in_channels=config.in_channels, @@ -723,10 +725,18 @@ class DFNRopeVisionTransformerPretrainedModel(PretrainedModel): mappings = get_tensor_parallel_split_mappings(vision_config.depth) return mappings - def set_state_dict(self, state_dict, *args, **kwargs): - """_summary_ - - Args: - state_dict (_type_): _description_ - """ - super().set_state_dict(state_dict, *args, **kwargs) + def load_state_dict(self, state_dict): + params_dict = dict(self.named_parameters()) + for param_name, param in params_dict.items(): + state_dict_key = f"{self.prefix_name}.{param_name}" + if state_dict_key not in state_dict: + raise ValueError( + f"The key {state_dict_key} does not exist in state_dict. " + ) + tensor = get_tensor(state_dict.pop(state_dict_key)) + if param.shape != tensor.shape: + raise ValueError( + f"{state_dict_key} param.shape={param.shape} tensor.shape={tensor.shape}" + ) + else: + param.copy_(tensor, False) diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 2c5786570..b10b6b03e 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -17,14 +17,16 @@ from __future__ import annotations from dataclasses import dataclass +from functools import partial from typing import Dict, Optional, Union import numpy as np import paddle from paddle import nn +from paddleformers.transformers import PretrainedModel from paddleformers.utils.log import logger -from fastdeploy.config import FDConfig +from fastdeploy.config import FDConfig, ModelConfig from fastdeploy.distributed.communication_op import \ tensor_model_parallel_all_reduce from fastdeploy.model_executor.graph_optimization.decorator import \ @@ -467,8 +469,15 @@ class Ernie4_5_VLMoeForConditionalGeneration(ModelForCasualLM): fd_config (FDConfig): Configurations for the LLM model. """ super(Ernie4_5_VLMoeForConditionalGeneration, self).__init__(fd_config) - - self.model = Ernie4_5_VLModel(fd_config=fd_config) + # ----------- vision model ------------ + vision_config = fd_config.model_config.vision_config + self.vision_model = self._init_vision_model(vision_config) + # ----------- resampler_model ------------ + self.resampler_model = self._init_resampler_model_model( + fd_config.model_config + ) + # ernie + self.ernie = Ernie4_5_VLModel(fd_config=fd_config) self.ori_vocab_size = fd_config.model_config.ori_vocab_size @@ -480,6 +489,37 @@ class Ernie4_5_VLMoeForConditionalGeneration(ModelForCasualLM): ) self.tie_word_embeddings = fd_config.model_config.tie_word_embeddings + def _init_vision_model(self, vision_config) -> nn.Layer: + from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope.modeling import \ + DFNRopeVisionTransformerPretrainedModel + + vision_model = DFNRopeVisionTransformerPretrainedModel( + vision_config, prefix_name="vision_model" + ) + vision_model = paddle.amp.decorate( + models=vision_model, level="O2", dtype="bfloat16" + ) + vision_model.eval() + return vision_model + + def _init_resampler_model_model(self, model_config) -> nn.Layer: + from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import \ + VariableResolutionResamplerModel + + resampler_model = VariableResolutionResamplerModel( + model_config.pixel_hidden_size, + model_config.hidden_size, + model_config.spatial_conv_size, + model_config.temporal_conv_size, + config=model_config, + prefix_name="ernie.resampler_model", + ) + resampler_model = paddle.amp.decorate( + models=resampler_model, level="O2", dtype="bfloat16" + ) + resampler_model.eval() + return resampler_model + @classmethod def name(self): return "Ernie4_5_VLMoeForConditionalGeneration" @@ -495,10 +535,12 @@ class Ernie4_5_VLMoeForConditionalGeneration(ModelForCasualLM): A dictionary containing model parameters, where keys are parameter names and values are NumPy arrays or PaddlePaddle tensors. """ - self.model.load_state_dict(state_dict) + self.ernie.load_state_dict(state_dict) + self.vision_model.load_state_dict(state_dict) + self.resampler_model.load_state_dict(state_dict) if self.tie_word_embeddings: self.lm_head.out_linear.weight.set_value( - self.model.embeddings.word_embeddings.weight.transpose([1, 0])) + self.ernie.embeddings.word_embeddings.weight.transpose([1, 0])) else: self.lm_head.load_state_dict(state_dict) @@ -515,8 +557,204 @@ class Ernie4_5_VLMoeForConditionalGeneration(ModelForCasualLM): image_features: paddle.Tensor, forward_meta: ForwardMeta, ): - hidden_states = self.model(ids_remove_padding=ids_remove_padding, + hidden_states = self.ernie(ids_remove_padding=ids_remove_padding, image_features=image_features, forward_meta=forward_meta) return hidden_states + + +class Ernie4_5_VLPretrainedModel(PretrainedModel): + """ + Ernie4_5_PretrainedModel + """ + + config_class = FDConfig + + def _init_weight(self, layer): + """ + _init_weight + """ + return None + + from fastdeploy.model_executor.models.tp_utils import \ + TensorSplitMode as tsm + from fastdeploy.model_executor.models.utils import \ + LayerIdPlaceholder as layerid + from fastdeploy.model_executor.models.utils import WeightMeta + + weight_infos = [ + WeightMeta( + f".layers.{{{layerid.LAYER_ID}}}.self_attn.qkv_proj.weight", + True, + tsm.GQA, + ), + WeightMeta( + f".layers.{{{layerid.LAYER_ID}}}.self_attn.o_proj.weight", False + ), + WeightMeta( + f".layers.{{{layerid.FFN_LAYER_ID}}}.mlp.up_gate_proj.weight", + True, + tsm.PairFused, + ), + WeightMeta( + f".layers.{{{layerid.FFN_LAYER_ID}}}.mlp.down_proj.weight", False + ), + WeightMeta( + f".layers.{{{layerid.MOE_LAYER_ID}}}.mlp.experts.{{{layerid.TEXT_EXPERT_ID}}}.up_gate_proj.weight", + True, + tsm.PairFused, + ), + WeightMeta( + f".layers.{{{layerid.MOE_LAYER_ID}}}.mlp.experts.{{{layerid.TEXT_EXPERT_ID}}}.down_proj.weight", + False, + ), + WeightMeta( + f".layers.{{{layerid.MOE_LAYER_ID}}}.mlp.experts.{{{layerid.IMG_EXPERT_ID}}}.up_gate_proj.weight", + True, + tsm.PairFused, + ), + WeightMeta( + f".layers.{{{layerid.MOE_LAYER_ID}}}.mlp.experts.{{{layerid.IMG_EXPERT_ID}}}.down_proj.weight", + False, + ), + WeightMeta( + f".layers.{{{layerid.MOE_LAYER_ID}}}.mlp.shared_experts.up_gate_proj.weight", + True, + tsm.PairFused, + ), + WeightMeta( + f".layers.{{{layerid.MOE_LAYER_ID}}}.mlp.shared_experts.down_proj.weight", + False, + ), + WeightMeta( + f".layers.{{{layerid.MOE_LAYER_ID}}}.mlp.shared_experts.down_proj.weight", + False, + ), + WeightMeta(".embed_tokens.weight", False), + WeightMeta("lm_head.weight", True), + ] + + weight_vison = [ + # resampler_model + WeightMeta("ernie.resampler_model.spatial_linear.0.weight", False), + WeightMeta("resampler_model.spatial_linear.0.weight", False), + # vision + WeightMeta( + f"vision_model.blocks.{{{layerid.LAYER_ID}}}.attn.proj.weight", + False, + ), + WeightMeta( + f"vision_model.blocks.{{{layerid.LAYER_ID}}}.mlp.fc2.weight", False + ), + WeightMeta( + f"vision_model.blocks.{{{layerid.LAYER_ID}}}.mlp.fc1.weight", True + ), + WeightMeta( + f"vision_model.blocks.{{{layerid.LAYER_ID}}}.mlp.fc1.bias", True + ), + WeightMeta( + f"vision_model.blocks.{{{layerid.LAYER_ID}}}.attn.qkv.weight", + True, + tsm.GQA, + ), + WeightMeta( + f"vision_model.blocks.{{{layerid.LAYER_ID}}}.attn.qkv.bias", + True, + tsm.GQA, + ), + ] + + @classmethod + def _get_tensor_parallel_mappings(cls, config: ModelConfig, is_split=True): + """ + get_tensor_parallel_mappings + """ + logger.info("erine inference model _get_tensor_parallel_mappings") + from fastdeploy.model_executor.models.tp_utils import ( + build_expanded_keys, has_prefix, split_or_merge_func_v1) + + fn = split_or_merge_func_v1( + is_split=is_split, + tensor_parallel_degree=config.tensor_parallel_degree, + tensor_parallel_rank=config.tensor_parallel_rank, + num_attention_heads=config.num_attention_heads, + num_key_value_heads=config.num_key_value_heads, + head_dim=config.head_dim, + ) + vision_fn = split_or_merge_func_v1( + is_split=is_split, + tensor_parallel_degree=config.tensor_parallel_degree, + tensor_parallel_rank=config.tensor_parallel_rank, + num_attention_heads=config.vision_config.num_heads, + num_key_value_heads=config.vision_config.num_heads, + head_dim=config.vision_config.hidden_size + // config.vision_config.num_heads, + ) + + def get_tensor_parallel_split_mappings( + num_layers: int, + moe_num_experts: list[int], + moe_layer_start_index: int, + prefix_name: str, + ): + base_actions = {} + for weight_name, is_column, extra in cls.weight_infos: + params = { + "is_column": is_column, + **({extra.value: True} if extra else {}), + } + + if "lm_head.weight" or "" in weight_name: + key = weight_name + elif not has_prefix(prefix_name, weight_name): + key = f"{prefix_name}{weight_name}" + else: + key = weight_name + base_actions[key] = partial(fn, **params) + final_actions = {} + final_actions = build_expanded_keys( + base_actions, + num_layers, + ( + moe_layer_start_index + if moe_layer_start_index > 0 + else num_layers + ), + text_num_experts=moe_num_experts[0], + img_num_experts=moe_num_experts[1], + ) + return final_actions + + def get_vison_parallel_split_mappings(num_layers: int): + base_actions = {} + for weight_name, is_column, extra in cls.weight_vison: + params = { + "is_column": is_column, + **({extra.value: True} if extra else {}), + } + base_actions[weight_name] = partial(vision_fn, **params) + final_actions = {} + final_actions = build_expanded_keys( + base_actions, + num_layers, + ) + return final_actions + + moe_layer_start_index = -1 + if isinstance(config.moe_layer_start_index, list): + moe_layer_start_index = min(config.moe_layer_start_index) + elif isinstance(config.moe_layer_start_index, int): + moe_layer_start_index = config.moe_layer_start_index + + mappings = get_tensor_parallel_split_mappings( + config.num_layers, + config.moe_num_experts, + moe_layer_start_index, + config.prefix_name, + ) + vision_mappings = get_vison_parallel_split_mappings( + config.vision_config.depth + ) + + return {**mappings, **vision_mappings} diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py b/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py index 4d2a94f32..830abc2a4 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py @@ -23,7 +23,8 @@ from paddle import nn from paddle.autograd import PyLayer from paddle.distributed.fleet.utils import recompute -from fastdeploy.model_executor.layers.utils import _set_var_distributed +from fastdeploy.model_executor.layers.utils import (_set_var_distributed, + get_tensor) from fastdeploy.model_executor.models.ernie4_5_vl.dist_utils import ( RowSequenceParallelLinear, all_gather_group, reduce_scatter_group, scatter_axis) @@ -138,7 +139,7 @@ class VariableResolutionResamplerModel(nn.Layer): """ def __init__(self, in_dim, out_dim, spatial_conv_size, temporal_conv_size, - config): + config, prefix_name: str = ""): super().__init__() self.in_dim = in_dim self.out_dim = out_dim @@ -148,6 +149,7 @@ class VariableResolutionResamplerModel(nn.Layer): self.use_recompute_resampler = config.use_recompute_resampler self.use_temporal_conv = config.use_temporal_conv self.tensor_parallel_degree = config.tensor_parallel_degree + self.prefix_name = prefix_name # for 空间四合一 self.spatial_dim = self.in_dim * self.spatial_conv_size * self.spatial_conv_size @@ -369,6 +371,22 @@ class VariableResolutionResamplerModel(nn.Layer): x = x[:-num_pad] return x + def load_state_dict(self, state_dict): + params_dict = dict(self.named_parameters()) + for param_name, param in params_dict.items(): + state_dict_key = f"{self.prefix_name}.{param_name}" + if state_dict_key not in state_dict: + raise ValueError( + f"The key {state_dict_key} does not exist in state_dict. " + ) + tensor = get_tensor(state_dict.pop(state_dict_key)) + if param.shape != tensor.shape: + raise ValueError( + f"{state_dict_key} param.shape={param.shape} tensor.shape={tensor.shape}" + ) + else: + param.copy_(tensor, False) + @classmethod def _get_tensor_parallel_mappings(cls, config, is_split=True): diff --git a/fastdeploy/model_executor/models/tp_utils.py b/fastdeploy/model_executor/models/tp_utils.py index 493201fa7..b7be6ff4d 100644 --- a/fastdeploy/model_executor/models/tp_utils.py +++ b/fastdeploy/model_executor/models/tp_utils.py @@ -38,19 +38,23 @@ def check_tensor_parallel_prerequisites( """check_tensor_parallel_prerequisites""" if fd_config.parallel_config.tensor_parallel_degree > 1: tensor_parallel_map = cls._get_tensor_parallel_mappings( - fd_config.model_config, is_split=True) + fd_config.model_config, is_split=True + ) if not tensor_parallel_map: - logger.error("filtered_quant_map should not be empty. \ + logger.error( + "filtered_quant_map should not be empty. \ parallel splitting required, but _get_tensor_parallel_mappings is not implemented." - ) - filtered_tp_keys = cls._resolve_prefix_keys(tensor_parallel_map.keys(), - safetensor_keys) + ) + filtered_tp_keys = cls._resolve_prefix_keys( + tensor_parallel_map.keys(), safetensor_keys + ) for k, v in filtered_tp_keys.items(): tensor_parallel_filtered_map[v] = tensor_parallel_map.pop(k) if not tensor_parallel_filtered_map: - logger.error("tensor_parallel_filtered_map should not be empty. \ + logger.error( + "tensor_parallel_filtered_map should not be empty. \ The weights required for tensor parallel splitting are inconsistent with the model's weights." - ) + ) def extract_prefix(weight_name: str) -> str: @@ -99,7 +103,14 @@ def update_final_actions(params, final_actions, key, action): final_actions[new_key] = action -def build_expanded_keys(num_layers, num_experts, start_layer, base_actions): +def build_expanded_keys( + base_actions, + num_layers, + start_layer: int = -1, + num_experts: int = 0, + text_num_experts: int = 0, + img_num_experts: int = 0, +): """build_expanded_keys""" final_actions = {} for key, action in base_actions.items(): @@ -116,6 +127,8 @@ def build_expanded_keys(num_layers, num_experts, start_layer, base_actions): action, ) elif LayerIdPlaceholder.FFN_LAYER_ID.value in placeholders: + if start_layer < 0: + continue for layer_id in range(start_layer): update_final_actions( {LayerIdPlaceholder.FFN_LAYER_ID.value: layer_id}, @@ -123,22 +136,65 @@ def build_expanded_keys(num_layers, num_experts, start_layer, base_actions): key, action, ) - elif (LayerIdPlaceholder.MOE_LAYER_ID.value in placeholders - and LayerIdPlaceholder.EXPERT_ID.value in placeholders): + elif ( + LayerIdPlaceholder.MOE_LAYER_ID.value in placeholders + and LayerIdPlaceholder.EXPERT_ID.value in placeholders + ): + if start_layer < 0: + continue for layer_id in range(start_layer, num_layers): for export_id in range(num_experts): update_final_actions( { - LayerIdPlaceholder.MOE_LAYER_ID.value: - layer_id, + LayerIdPlaceholder.MOE_LAYER_ID.value: layer_id, LayerIdPlaceholder.EXPERT_ID.value: export_id, }, final_actions, key, action, ) - elif (LayerIdPlaceholder.MOE_LAYER_ID.value in placeholders - and len(placeholders) == 1): + elif ( + LayerIdPlaceholder.MOE_LAYER_ID.value in placeholders + and LayerIdPlaceholder.TEXT_EXPERT_ID.value in placeholders + ): + if start_layer < 0: + continue + for layer_id in range(start_layer, num_layers): + for export_id in range(text_num_experts): + update_final_actions( + { + LayerIdPlaceholder.MOE_LAYER_ID.value: layer_id, + LayerIdPlaceholder.TEXT_EXPERT_ID.value: export_id, + }, + final_actions, + key, + action, + ) + elif ( + LayerIdPlaceholder.MOE_LAYER_ID.value in placeholders + and LayerIdPlaceholder.IMG_EXPERT_ID.value in placeholders + ): + if start_layer < 0: + continue + for layer_id in range(start_layer, num_layers): + for export_id in range( + text_num_experts, text_num_experts + img_num_experts + ): + update_final_actions( + { + LayerIdPlaceholder.MOE_LAYER_ID.value: layer_id, + LayerIdPlaceholder.IMG_EXPERT_ID.value: export_id, + }, + final_actions, + key, + action, + ) + elif ( + LayerIdPlaceholder.MOE_LAYER_ID.value in placeholders + and len(placeholders) == 1 + ): + if start_layer < 0: + continue for layer_id in range(start_layer, num_layers): update_final_actions( {LayerIdPlaceholder.MOE_LAYER_ID.value: layer_id}, @@ -147,7 +203,7 @@ def build_expanded_keys(num_layers, num_experts, start_layer, base_actions): action, ) else: - logger.error(f"{key} does not match any case.") + raise ValueError(f"{key} does not match any case.") return final_actions diff --git a/fastdeploy/model_executor/models/utils.py b/fastdeploy/model_executor/models/utils.py index c792c07d3..0bf40611e 100644 --- a/fastdeploy/model_executor/models/utils.py +++ b/fastdeploy/model_executor/models/utils.py @@ -28,9 +28,7 @@ from typing import NamedTuple, Optional import numpy as np import paddle -import paddle.distributed as dist from paddle.common_ops_import import convert_dtype -from paddle.distributed import fleet from paddleformers.transformers.model_utils import _add_variant from paddleformers.transformers.utils import paddleformers_load from paddleformers.utils.env import (PADDLE_WEIGHTS_INDEX_NAME, @@ -50,7 +48,8 @@ class LayerIdPlaceholder(str, enum.Enum): FFN_LAYER_ID = "ffn_layer_id" MOE_LAYER_ID = "moe_layer_id" EXPERT_ID = "export_id" - + TEXT_EXPERT_ID = "text_export_id" + IMG_EXPERT_ID = "img_export_id" class WeightMeta(NamedTuple): """ @@ -272,31 +271,6 @@ def load_prefix_weights( return past_key_values -def init_distributed_env() -> tuple[int, int]: - """init distributed envs, and only support mp in ErnieBotModel - - Returns: - tuple[int, int]: tensor_parallel_degree, tensor_parallel_rank - """ - tensor_parallel_degree = dist.get_world_size() - tensor_parallel_rank = 0 - - if tensor_parallel_degree > 1: - strategy = fleet.DistributedStrategy() - strategy.hybrid_configs = { - "dp_degree": 1, - "mp_degree": tensor_parallel_degree, - "pp_degree": 1, - "sharding_degree": 1, - } - - fleet.init(is_collective=True, strategy=strategy) - hcg = fleet.get_hybrid_communicate_group() - tensor_parallel_rank = hcg.get_model_parallel_rank() - - return tensor_parallel_degree, tensor_parallel_rank - - def w4a8_weight_convert(state_dict): """W4A8 权重转换函数 Args: diff --git a/fastdeploy/rl/rollout_config.py b/fastdeploy/rl/rollout_config.py index 4e9de922c..7176d21d4 100644 --- a/fastdeploy/rl/rollout_config.py +++ b/fastdeploy/rl/rollout_config.py @@ -107,4 +107,4 @@ class RolloutModelConfig: def initialize(self): """Initialize the final fd config""" - return initialize_fd_config(self) + return initialize_fd_config(self, self.tensor_parallel_size, 0) diff --git a/fastdeploy/worker/vl_gpu_model_runner.py b/fastdeploy/worker/vl_gpu_model_runner.py index 5ad5c0f72..ac54fb218 100644 --- a/fastdeploy/worker/vl_gpu_model_runner.py +++ b/fastdeploy/worker/vl_gpu_model_runner.py @@ -14,39 +14,26 @@ # limitations under the License. """ import argparse -import json import os import random -from typing import Optional import numpy as np import paddle import paddle.distributed.fleet as fleet -from paddleformers.transformers.model_utils import load_tp_checkpoint -from safetensors import safe_open -from fastdeploy.config import (DeviceConfig, FDConfig, GraphOptimizationConfig, - KVCacheConfig, LoadConfig, ModelConfig, - MoEConfig, MoEPhase, ParallelConfig, - SpeculativeConfig) +from fastdeploy.config import ModelConfig from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer from fastdeploy.input.mm_processor import DataProcessor +from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.model_executor.layers.attention import get_attention_backend from fastdeploy.model_executor.layers.rotary_embedding import get_rope_3d from fastdeploy.model_executor.layers.sample.meta_data import SamplingMetadata from fastdeploy.model_executor.layers.sample.sampler import Sampler -from fastdeploy.model_executor.models.ernie4_5_moe import \ - Ernie4_5_PretrainedModel from fastdeploy.model_executor.models.ernie4_5_vl.configuration import \ Ernie4_5_VLMoeConfig -from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope import \ - DFNRopeVisionTransformerConfig -from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope.modeling import \ - DFNRopeVisionTransformerPretrainedModel -from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import ( - ScatterOp, VariableResolutionResamplerModel) +from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import \ + ScatterOp from fastdeploy.platforms import current_platform -from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.worker.output import SamplerOutput from fastdeploy.worker.utils import check_safetensors_model from fastdeploy.worker.vl_model_runner_base import VLModelRunnerBase @@ -169,6 +156,34 @@ class GPUVLModelRunner(VLModelRunnerBase): task.start_idx += token_chunk_size task.chunk_idx += 1 + def _init_image_preprocess(self, vision_config) -> None: + processor = DataProcessor( + tokenizer_name=self.args.tokenizer, + image_preprocessor_name=str(self.args.image_preprocessor), + ) + processor.eval() + image_preprocess = processor.image_preprocessor + image_preprocess.image_mean_tensor = paddle.to_tensor( + image_preprocess.image_mean, dtype="float32" + ).reshape([1, 3, 1, 1]) + image_preprocess.image_std_tensor = paddle.to_tensor( + image_preprocess.image_std, dtype="float32" + ).reshape([1, 3, 1, 1]) + image_preprocess.rescale_factor = paddle.to_tensor( + image_preprocess.rescale_factor, dtype="float32" + ) + image_preprocess.image_mean_tensor = ( + image_preprocess.image_mean_tensor.squeeze( + [-2, -1] + ).repeat_interleave(vision_config.patch_size**2 * 1, -1) + ) + image_preprocess.image_std_tensor = ( + image_preprocess.image_std_tensor.squeeze( + [-2, -1] + ).repeat_interleave(vision_config.patch_size**2 * 1, -1) + ) + return image_preprocess + def _load_model( self, model_name: str, @@ -198,98 +213,41 @@ class GPUVLModelRunner(VLModelRunnerBase): if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.unk_token - config = Ernie4_5_VLMoeConfig.from_pretrained( - self.args.llm_model_name_or_path, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - moe_group="dummy", - ) - self.model_cfg = config - if self.is_safetensors_model: - meta_json = os.path.join(self.args.model_name_or_path, - "model.safetensors.index.json") - if os.path.exists(meta_json): - with open( - os.path.join(self.args.model_name_or_path, - "model.safetensors.index.json"), - "r") as f: - self.weight_map = json.load(f)["weight_map"] - else: - self.weight_map = {} - with safe_open(os.path.join(self.args.model_name_or_path, - "model.safetensors"), - framework="np") as f: - keys = f.keys() - for k in keys: - self.weight_map[k] = "model.safetensors" - - if self.is_safetensors_model: - vision_config = config.vision_config - vision_config.tensor_parallel_degree = self.tensor_parallel_degree - vision_config.tensor_parallel_rank = self.tensor_parallel_rank - vision_config.attn_sep = False - vision_config.dtype = "bfloat16" - else: - vision_config = DFNRopeVisionTransformerConfig.from_pretrained( - self.args.vision_model_name_or_path, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - attn_sep=False, - dtype="bfloat16", - ) - config.vision_config = vision_config - self.vision_config = vision_config - config.pixel_hidden_size = config.vision_config.hidden_size - config.im_patch_id = tokenizer.get_vocab()["<|IMAGE_PLACEHOLDER|>"] - config.think_end_id = tokenizer.get_vocab()[""] - config.max_text_id = config.im_patch_id - - config.sequence_parallel = False - self.dtype = self.args.dtype paddle.set_default_dtype(self.dtype) - self.vision_model, self.resampler_model = self.inject_pp_vision_model( - self.args, config) + from fastdeploy.worker.worker_process import initialize_fd_config - processor = DataProcessor( - tokenizer_name=self.args.tokenizer, - image_preprocessor_name=str(self.args.image_preprocessor), + fd_config = initialize_fd_config( + self.args, self.tensor_parallel_degree, self.tensor_parallel_rank ) - processor.eval() - image_preprocess = processor.image_preprocessor - image_preprocess.image_mean_tensor = paddle.to_tensor( - image_preprocess.image_mean, dtype="float32").reshape([1, 3, 1, 1]) - image_preprocess.image_std_tensor = paddle.to_tensor( - image_preprocess.image_std, dtype="float32").reshape([1, 3, 1, 1]) - image_preprocess.rescale_factor = paddle.to_tensor( - image_preprocess.rescale_factor, dtype="float32") - image_preprocess.image_mean_tensor = image_preprocess.image_mean_tensor.squeeze( - [-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1, - -1) - image_preprocess.image_std_tensor = image_preprocess.image_std_tensor.squeeze( - [-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1, - -1) - self.image_preprocess = image_preprocess - - graph_opt_config = GraphOptimizationConfig( - self.args.enable_static_graph_inference, self.args.use_cudagraph, - self.args.max_capture_batch_size) - - fd_config, self.model = build_stream_line_model( - self.args.model_name_or_path, - self.args.dtype, - self.args.block_size, - max_model_len=self.args.max_model_len, - tokenizer=tokenizer, - quantization=self.args.quantization, - graph_opt_config=graph_opt_config, + fd_config.model_config = Ernie4_5_VLMoeConfig( + **fd_config.model_config.__dict__ ) - self.model.eval() - self.set_state_dict(self.args) - - fd_config.parallel_config.max_model_len = fd_config.model_config.max_seq_len + fd_config.parallel_config.column_cut = False + vision_config = fd_config.model_config.vision_config + vision_config.attn_sep = False + vision_config.dtype = "bfloat16" + vision_config.tensor_parallel_degree = self.tensor_parallel_degree + vision_config.tensor_parallel_rank = self.tensor_parallel_rank + fd_config.model_config.pixel_hidden_size = vision_config.hidden_size + fd_config.model_config.im_patch_id = tokenizer.get_vocab()[ + "<|IMAGE_PLACEHOLDER|>" + ] + fd_config.model_config.think_end_id = tokenizer.get_vocab()[""] + fd_config.model_config.max_text_id = fd_config.model_config.im_patch_id + fd_config.model_config.sequence_parallel = False + # TODO (bukejiyu): Remove the assignment + fd_config.moe_config.top_k = 8 self.fd_config = fd_config + self.model_cfg = self.fd_config.model_config + self.image_preprocess = self._init_image_preprocess( + self.fd_config.model_config.vision_config + ) + from fastdeploy.model_executor.model_loader import \ + get_model_from_loader + + self.model = get_model_from_loader(self.fd_config) attn_backend_cls = get_attention_backend() num_heads = self.fd_config.model_config.num_attention_heads // \ self.fd_config.parallel_config.tensor_parallel_degree @@ -401,188 +359,6 @@ class GPUVLModelRunner(VLModelRunnerBase): self._init_kvcache() self.model.log_memory_usage("update all memory") - @paddle.no_grad() - def set_state_dict(self, args: argparse.Namespace) -> None: - """set_state_dict""" - if not self.is_safetensors_model: - rank_model_paths = [] - for root, dirs, files in os.walk(self.args.llm_model_name_or_path): - for file in files: - if file == f"model_state.tp0{self.tensor_parallel_rank}.pdparams": - rank_model_paths.append(os.path.join(root, file)) - elif file == "model_state.pdparams": - rank_model_paths.append(os.path.join(root, file)) - state_dict = {} - for path in rank_model_paths: - loaded_dict = paddle.load(path, return_numpy=True) - state_dict.update(loaded_dict) - - resampler_state = {} - for key in list(state_dict.keys()): - if "vision" in key: - state_dict.pop(key) - if key.startswith("ernie.resampler_model."): - value = state_dict.pop(key) - value = paddle.to_tensor(value).cast("bfloat16") - value = value.numpy() - resampler_state[ - key[len("ernie.resampler_model."):]] = value - elif key.startswith("resampler_model."): - value = state_dict.pop(key) - value = paddle.to_tensor(value).cast("bfloat16") - value = value.numpy() - resampler_state[key[len("resampler_model."):]] = value - self.model.set_state_dict(state_dict) - self.resampler_model.set_state_dict(resampler_state) - else: - state_dict = load_tp_checkpoint( - args.model_name_or_path, - Ernie4_5_PretrainedModel, - self.model_cfg, - return_numpy=True, - ) - for key in list(state_dict.keys()): - if key.startswith("vision_model.") or key.startswith( - "ernie.resampler_model."): - state_dict.pop(key) - self.model.set_state_dict(state_dict) - - @paddle.no_grad() - def vit_load( - self, - model_path: str, - tensor_parallel_degree: int, - tensor_parallel_rank: int, - ) -> None: - """ - Load vit tp weight - """ - if tensor_parallel_degree == 1: - rank_model_path = os.path.join(model_path, "model_state.pdparams") - else: - rank_model_path = os.path.join( - model_path, f"model_state_tp0{tensor_parallel_rank}.pdparams") - if os.path.exists(rank_model_path): - return paddle.load(rank_model_path, return_numpy=True) - else: - raise ValueError(f"No such a file {rank_model_path}") - - @paddle.no_grad() - def inject_pp_vision_model(self, args: argparse.Namespace, cfg: Ernie4_5_VLMoeConfig): - """ - Inject pp vision model - """ - - def set_vision_state_dict(model, - tensor_parallel_degree: int=8, - tensor_parallel_rank: int=0, - name: str=""): - """ - Set vision model weight - """ - model_state_dict = model.state_dict() - compat_keys = [name + k for k in model_state_dict.keys()] - model_files = set() - for k in compat_keys: - if k in self.weight_map.keys(): - model_files.add( - os.path.join(args.model_name_or_path, - self.weight_map[k])) - state_dict = {} - for model_file in model_files: - with safe_open(model_file, framework="np") as f: - for k in f.keys(): - if k in compat_keys: - new_k = k.replace(name, "") - tensor = f.get_tensor(k) - if tensor_parallel_degree > 1: - if "resampler_model" in name and new_k == "spatial_linear.0.weight": - tensor = np.split( - tensor, tensor_parallel_degree, - axis=0)[tensor_parallel_rank] - elif name == "vision_model.": - if "attn.proj.weight" in new_k or "fc2.weight" in new_k: - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=0)[tensor_parallel_rank] - elif "fc1.weight" in new_k or "fc1.bias" in new_k: - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=-1)[tensor_parallel_rank] - elif "qkv.weight" in new_k: - head_dim = self.vision_config.hidden_size // self.vision_config.num_heads - tensor = tensor.reshape([ - self.vision_config.hidden_size, 3, - self.vision_config.num_heads, - head_dim - ]) - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=-2 - )[tensor_parallel_rank].reshape([ - self.vision_config.hidden_size, -1 - ]) - elif "qkv.bias" in new_k: - head_dim = self.vision_config.hidden_size // self.vision_config.num_heads - tensor = tensor.reshape([ - 3, self.vision_config.num_heads, - head_dim - ]) - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=-2 - )[tensor_parallel_rank].reshape([-1]) - state_dict[new_k] = tensor - model.set_state_dict(state_dict) - - vision_model = DFNRopeVisionTransformerPretrainedModel( - cfg.vision_config) - vision_model = paddle.amp.decorate(models=vision_model, - level="O2", - dtype="bfloat16") - vision_model.eval() - if not self.is_safetensors_model: - vit_state_dict = self.vit_load(args.vision_model_name_or_path, - self.tensor_parallel_degree, - self.tensor_parallel_rank) - vision_model.set_state_dict(vit_state_dict) - else: - set_vision_state_dict( - vision_model, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - name="vision_model.", - ) - - resampler_model = VariableResolutionResamplerModel( - cfg.pixel_hidden_size, - cfg.hidden_size, - cfg.spatial_conv_size, - cfg.temporal_conv_size, - config=cfg, - ) - resampler_model = paddle.amp.decorate(models=resampler_model, - level="O2", - dtype="bfloat16") - resampler_model.eval() - if self.is_safetensors_model: - is_ernie_begin = False - for k in self.weight_map.keys(): - if k.startswith("ernie.resampler_model."): - is_ernie_begin = True - set_vision_state_dict( - resampler_model, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - name="ernie.resampler_model." - if is_ernie_begin else "resampler_model.", - ) - return vision_model, resampler_model - @paddle.no_grad() def extract_vision_features(self, inputs: list[paddle.Tensor]) -> paddle.Tensor: """extract_vision_features""" @@ -607,7 +383,7 @@ class GPUVLModelRunner(VLModelRunnerBase): level="O2", dtype=self.dtype, ): - image_features = self.vision_model.extract_feature( + image_features = self.model.vision_model.extract_feature( images, grid_thw) if self.tensor_parallel_degree > 1: S, C = image_features.shape @@ -616,7 +392,7 @@ class GPUVLModelRunner(VLModelRunnerBase): image_features = ScatterOp.apply(image_features, axis=-1) # mp 切 Fea image_features = image_features.reshape([S, -1]) - image_features = self.resampler_model( + image_features = self.model.resampler_model( image_features, image_mask, token_type_ids_w_video, @@ -1074,195 +850,3 @@ class GPUVLModelRunner(VLModelRunnerBase): images=images, ) return result - - -def build_stream_line_model( - model_path: str, - dtype: str, - block_size: int, - max_model_len: int, - tokenizer: ErnieBotTokenizer, - quantization: str = "None", - graph_opt_config: Optional[GraphOptimizationConfig] = None -) -> tuple[FDConfig, paddle.nn.layer]: - """ - build model - """ - import contextlib - - from paddleformers.transformers.configuration_utils import PretrainedConfig - from paddleformers.trl import llm_utils - from paddleformers.utils.log import logger - - from fastdeploy.model_executor.layers.quantization import \ - get_quantization_config - from fastdeploy.model_executor.models.model_base import ModelRegistry - - config, _ = PretrainedConfig.get_config_dict(model_path) - config["head_dim"] = config.get( - "head_dim", config["hidden_size"] // config["num_attention_heads"]) - config["rope_theta"] = config.get("rope_theta", 10000.0) - rope_theta = config["rope_theta"] - model_config = ModelConfig.from_dict(config) - model_config.head_dim = config["head_dim"] - - parallel_config = ParallelConfig() - speculative_config = SpeculativeConfig() - device_config = DeviceConfig() - load_config = LoadConfig() - moe_config = MoEConfig() - kv_cache_config = KVCacheConfig() - kv_cache_config.cache_quant_dtype = "none" - - tensor_parallel_rank, tensor_parallel_degree = llm_utils.init_dist_env() - parallel_config.tensor_parallel_rank = tensor_parallel_rank - parallel_config.tensor_parallel_degree = tensor_parallel_degree - parallel_config.tensor_parallel_degree = tensor_parallel_degree - parallel_config.expert_parallel_degree = 1 - parallel_config.expert_parallel_rank = int(tensor_parallel_rank / - tensor_parallel_degree) - parallel_config.column_cut = False - - speculative_config.is_mtp = False - speculative_config.draft_type = "None" - - # Note(tangbinhan): used for load_checkpoint - model_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank - model_config.tensor_parallel_degree = parallel_config.tensor_parallel_degree - model_config.is_mtp = speculative_config.is_mtp - moe_config.num_experts = None - - # use the length of tokenizer as the origin vocab size - ori_vocab_size = len(tokenizer) - moe_intermediate_size = (config.get("moe_intermediate_size", None), ) - if isinstance(moe_intermediate_size, list) or isinstance( - moe_intermediate_size, tuple): - moe_intermediate_size = moe_intermediate_size[0] - - num_key_value_heads = config.get("num_key_value_heads", -1) - if num_key_value_heads is None: - num_key_value_heads = -1 - - # RL need, some model num_key_value_heads less tensor_parallel_degree, need copy - if num_key_value_heads < tensor_parallel_degree: - logger.warning( - f"key value heads num is {num_key_value_heads}, tensor parallel degree is {tensor_parallel_degree}" - ) - num_key_value_heads = tensor_parallel_degree - - if config.get("ffn_hidden_size", None) is not None: - ffn_hidden_size = config["ffn_hidden_size"] - elif config.get("intermediate_size", None) is not None: - ffn_hidden_size = config["intermediate_size"] - else: - ffn_hidden_size = 4 * config["hidden_size"] - if config["hidden_act"].lower() == "swiglu": - if paddle.distributed.get_world_size() > 1: - multiple_of = 8 * config["num_attention_heads"] - else: - multiple_of = 4 * config["num_attention_heads"] - ffn_hidden_size = multiple_of * ( - (int(2 * ffn_hidden_size / 3) + multiple_of - 1) // - multiple_of) - - num_layers = config.get("num_layers", None) or config.get( - "num_hidden_layers", None) - if num_layers is None: - raise ValueError(f"num_layers<{num_layers}> is invalid") - - remove_tail_layer = config.get("remove_tail_layer") - if remove_tail_layer is True: - num_layers -= 1 - elif isinstance(remove_tail_layer, int): - num_layers -= remove_tail_layer - - moe_num_experts = config.get("moe_num_experts", 0) - if isinstance(moe_num_experts, list): - moe_num_experts = max(moe_num_experts) - use_moe = moe_num_experts > 0 - - context = contextlib.nullcontext() - - if config["hidden_act"].lower() == "swiglu": - model_config.hidden_act = "swiglu" - model_config.ffn_hidden_size = ffn_hidden_size - model_config.max_seq_len = max_model_len - model_config.num_layers = num_layers - model_config.dtype = dtype - parallel_config.block_size = block_size - - parallel_config.msg_queue_id = None - model_config.num_key_value_heads = num_key_value_heads - model_config.return_all_hidden_states = False - speculative_config.draft_type = "None" - model_config.start_layer_index = 0 - if use_moe: - moe_config.num_experts = config.get("moe_num_experts", None) - moe_config.moe_intermediate_size = config.get("moe_intermediate_size", - None) - moe_config.top_k = config.get("moe_topk", 8) - moe_config.moe_num_shared_experts = config.get( - "moe_num_shared_experts", 0) - moe_config.moe_layer_start_index = config.get("moe_layer_start_index", - None) - moe_config.moe_layer_end_index = config.get("moe_layer_end_index", - None) - - model_config.moe_phase = MoEPhase.PREFILL - model_config.ori_vocab_size = ori_vocab_size - - quantization_config = config.get("quantization_config", None) - - quant_config_name = None - if quantization_config is not None and quantization_config.get( - "quantization", None) is None: - raise ValueError( - "quantization_config should have a key named 'quantization' for specify quant config." - ) - - if quantization_config is not None: - quant_config_name = quantization_config["quantization"] - quant_cls = get_quantization_config(quant_config_name) - quant_config = quant_cls.from_config(quantization_config) - elif quantization != "None": - quantization_config = {} - if use_moe and quantization == "wint4": - quantization_config["dense_quant_type"] = "wint8" - quantization_config["moe_quant_type"] = "wint4" - quant_config_name = "mix_quant" - else: - quant_config_name = quantization - quant_cls = get_quantization_config(quant_config_name) - quant_config = quant_cls.from_config(quantization_config) - else: - quant_config = None - - logger.info("===========quantization_config==============") - if quant_config is not None: - logger.info(f"{quantization_config}") - else: - logger.info( - "No quantization config found and use original weight and act dtype." - ) - logger.info("============================================") - - fd_config = FDConfig( - model_config=model_config, - parallel_config=parallel_config, - speculative_config=speculative_config, - device_config=device_config, - load_config=load_config, - moe_config=moe_config, - quant_config=quant_config, - kv_cache_config=kv_cache_config, - graph_opt_config=graph_opt_config, - ) - fd_config.parallel_config.max_model_len = max_model_len - fd_config.model_config.rope_theta = rope_theta - - with context: - model_cls = ModelRegistry.get_class(model_config.architectures[0]) - model = model_cls(fd_config) - - model.eval() - return fd_config, model diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index c0b0e7d93..fdcddb47c 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -62,6 +62,27 @@ def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase: from fastdeploy.worker.gcu_worker import GcuWorker return GcuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank) +def init_distributed_environment(seed: int = 20) -> List[int]: + """ Initialize Paddle Fleet and get rank of worker """ + # Global rank + ranks = dist.get_world_size() + dist_strategy = fleet.DistributedStrategy() + + dist_strategy.hybrid_configs = { + "dp_degree": 1, + "mp_degree": ranks, + "pp_degree": 1, + "sharding_degree": 1, + } + + # Set control in tensor parallel + dist_strategy.tensor_parallel_configs = {"tensor_init_seed": seed} + fleet.init(is_collective=True, strategy=dist_strategy) + + # Local rank + local_rank = fleet.worker_index() + + return ranks, local_rank class PaddleDisWorkerProc(): """ @@ -74,6 +95,8 @@ class PaddleDisWorkerProc(): def __init__( self, fd_config: FDConfig, + ranks: int = 1, + local_rank: int = 0 ) -> None: """ Initialize a distributed worker and task queue for single-node multi-GPU setup. @@ -82,34 +105,11 @@ class PaddleDisWorkerProc(): attributes such as weight_dtype, act_dtype, mp_size, hidden_size, head_dim, num_attention_heads, and ffn_hidden_size. """ + self.ranks = ranks + self.local_rank = local_rank self.fd_config = fd_config self.parallel_config = fd_config.parallel_config - # Initialize distributed enviroment - (self.ranks, self.local_rank) = self.init_distributed_enviroment() - - assert self.parallel_config.tensor_parallel_degree * self.parallel_config.expert_parallel_degree == self.ranks - - self.fd_config.parallel_config.tensor_parallel_rank = \ - self.local_rank % self.parallel_config.tensor_parallel_degree - self.fd_config.parallel_config.expert_parallel_rank = \ - int(self.local_rank / self.parallel_config.tensor_parallel_degree) - - if self.fd_config.parallel_config.use_ep: - self.fd_config.moe_config.num_experts_per_rank = \ - self.fd_config.moe_config.num_experts // self.parallel_config.expert_parallel_degree - self.fd_config.moe_config.num_experts_start_offset = \ - self.fd_config.parallel_config.expert_parallel_rank * self.fd_config.moe_config.num_experts_per_rank - - # For auto TP split - self.fd_config.model_config.tensor_parallel_degree = self.parallel_config.tensor_parallel_degree - self.fd_config.model_config.tensor_parallel_rank = self.parallel_config.tensor_parallel_rank - self.fd_config.model_config.use_ep = self.parallel_config.use_ep - - if self.fd_config.parallel_config.use_ep: - self.fd_config.model_config.num_experts_per_rank = self.fd_config.moe_config.num_experts_per_rank - self.fd_config.model_config.num_experts_start_offset = self.fd_config.moe_config.num_experts_start_offset - # TODO(gongshaotian): Use worker factory to get worker self.worker = get_worker(fd_config=fd_config, local_rank=self.local_rank, @@ -124,8 +124,7 @@ class PaddleDisWorkerProc(): is_server=False, num_client=self.parallel_config.tensor_parallel_degree, client_id=self.parallel_config.tensor_parallel_rank, - local_data_parallel_id=self.fd_config.parallel_config. - expert_parallel_rank) + local_data_parallel_id=self.parallel_config.expert_parallel_rank) def init_health_status(self) -> None: """ @@ -312,27 +311,6 @@ class PaddleDisWorkerProc(): self.exist_prefill_task_signal.value[ 0] = self.worker.prefill_finished() - def init_distributed_enviroment(self, seed: int = 20) -> List[int]: - """ Initialize Paddle Fleet and get rank of worker """ - # Global rank - self.ranks = dist.get_world_size() - dist_strategy = fleet.DistributedStrategy() - - dist_strategy.hybrid_configs = { - "dp_degree": 1, - "mp_degree": self.ranks, - "pp_degree": 1, - "sharding_degree": 1, - } - - # Set control in tensor parallel - dist_strategy.tensor_parallel_configs = {"tensor_init_seed": seed} - fleet.init(is_collective=True, strategy=dist_strategy) - - # Local rank - self.local_rank = fleet.worker_index() - - return self.ranks, self.local_rank def determine_num_available_blocks(self) -> None: """Profiles the peak memory usage of the model to determine how many @@ -581,7 +559,7 @@ def parse_args(): return args -def initialize_fd_config(config_or_args) -> FDConfig: +def initialize_fd_config(config_or_args, ranks: int = 1, local_rank: int = 0) -> FDConfig: """Initialize FDConfig from either RolloutModelConfig or argparse.Namespace Args: @@ -593,14 +571,12 @@ def initialize_fd_config(config_or_args) -> FDConfig: # Get model config from model directory model_config_dict, _ = ModelConfig.get_config_dict(config_or_args.model_name_or_path) - # Handle MoE related configs if 'num_experts' in model_config_dict: model_config_dict['moe_num_experts'] = model_config_dict.pop('num_experts') if 'num_experts_per_tok' in model_config_dict: model_config_dict['moe_topk'] = model_config_dict.pop('num_experts_per_tok') - # Set default values for model config model_config_dict["head_dim"] = model_config_dict.get( "head_dim", model_config_dict["hidden_size"] // model_config_dict["num_attention_heads"]) @@ -661,7 +637,6 @@ def initialize_fd_config(config_or_args) -> FDConfig: parallel_config.kv_cache_ratio = getattr(config_or_args, 'kv_cache_ratio', 1.0) parallel_config.first_token_id = getattr(config_or_args, 'first_token_id', None) parallel_config.gpu_memory_utilization = getattr(config_or_args, 'gpu_memory_utilization', 0.9) - parallel_config.engine_pid = getattr(config_or_args, 'engine_pid', None) parallel_config.do_profile = getattr(config_or_args, 'do_profile', False) parallel_config.dynamic_load_weight = getattr(config_or_args, 'dynamic_load_weight', False) parallel_config.pad_token_id = getattr(config_or_args, 'pad_token_id', None) @@ -719,7 +694,17 @@ def initialize_fd_config(config_or_args) -> FDConfig: if num_layers is None: raise ValueError(f"num_layers<{num_layers}> is invalid") - use_moe = model_config_dict.get("moe_layer_start_index", num_layers) < num_layers + if "moe_layer_start_index" in model_config_dict: + moe_layer_start_index = model_config_dict["moe_layer_start_index"] + use_moe = ( + isinstance(moe_layer_start_index, int) + and moe_layer_start_index < num_layers + ) or ( + isinstance(moe_layer_start_index, list) + and min(moe_layer_start_index) < num_layers + ) + else: + use_moe = False # Update model config model_config.ffn_hidden_size = ffn_hidden_size @@ -749,6 +734,28 @@ def initialize_fd_config(config_or_args) -> FDConfig: model_config.deepseekv3 = AutoConfig.from_pretrained( config_or_args.model_name_or_path) + assert parallel_config.tensor_parallel_degree * parallel_config.expert_parallel_degree == ranks + + parallel_config.tensor_parallel_rank = \ + local_rank % parallel_config.tensor_parallel_degree + parallel_config.expert_parallel_rank = \ + int(local_rank / parallel_config.tensor_parallel_degree) + + if parallel_config.use_ep: + moe_config.num_experts_per_rank = \ + moe_config.num_experts // parallel_config.expert_parallel_degree + moe_config.num_experts_start_offset = \ + parallel_config.expert_parallel_rank * moe_config.num_experts_per_rank + + # For auto TP split + model_config.tensor_parallel_degree = parallel_config.tensor_parallel_degree + model_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank + model_config.use_ep = parallel_config.use_ep + + if parallel_config.use_ep: + model_config.num_experts_per_rank = moe_config.num_experts_per_rank + model_config.num_experts_start_offset = moe_config.num_experts_start_offset + # Handle quantization config quantization_config = model_config_dict.get("quantization_config", None) if not model_config.is_quantized: @@ -771,7 +778,9 @@ def initialize_fd_config(config_or_args) -> FDConfig: quantization_config["quantization"] = quant_config_name # Special handling for Ernie models is_ernie = "Ernie4_5_ForCausalLM" in model_config_dict.get("architectures", []) or \ - "Ernie4_5_MoeForCausalLM" in model_config_dict.get("architectures", []) + "Ernie4_5_MoeForCausalLM" in model_config_dict.get("architectures", []) or \ + "Ernie4_5_VLMoeForConditionalGeneration" in model_config_dict.get( + "architectures", []) if use_moe and quant_config_name == "wint4" and is_ernie: quantization_config["dense_quant_type"] = "wint8" quantization_config["moe_quant_type"] = "wint4" @@ -840,11 +849,13 @@ def run_worker_proc() -> None: # Get args form Engine args = parse_args() + ranks, local_rank = init_distributed_environment() + # Get fd_config - fd_config = initialize_fd_config(args) + fd_config = initialize_fd_config(args, ranks, local_rank) # Create worker process - worker_proc = PaddleDisWorkerProc(fd_config) + worker_proc = PaddleDisWorkerProc(fd_config, ranks, local_rank) # Initialize device and create model runner worker_proc.init_device()