diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 0f34e761b..55bf59b8e 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -378,9 +378,7 @@ class LoadConfig: dynamic_load_weight: Whether to enable dynamic weight loading load_strategy: Specifies the weight loading method when enabled: - 'ipc': Real-time IPC streaming with automatic resharding - - 'ipc_no_reshard': Real-time IPC streaming without weight process - 'ipc_snapshot': Load from disk snapshot of IPC weights - - 'meta': provide RL traing worker, no_weights_load - None: No dynamic loading """ def __init__( @@ -389,7 +387,7 @@ class LoadConfig: ): self.use_fastsafetensor = int(envs.FD_USE_FASTSAFETENSOR) == 1 self.dynamic_load_weight: bool = False - self.load_strategy: Optional[Literal['ipc', 'ipc_no_reshard', 'ipc_snapshot', 'meta']] = None + self.load_strategy: Optional[Literal['ipc', 'ipc_snapshot']] = None for key, value in args.items(): if hasattr(self, key): setattr(self, key, value) diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 59d0daf32..ba415978b 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -92,7 +92,7 @@ class EngineArgs: """ dynamic load weight """ - load_strategy: str = "meta" + load_strategy: str = "ipc_snapshot" """ dynamic load weight strategy """ diff --git a/fastdeploy/engine/config.py b/fastdeploy/engine/config.py index 01ede8587..2d49aa0ce 100644 --- a/fastdeploy/engine/config.py +++ b/fastdeploy/engine/config.py @@ -43,7 +43,7 @@ class ModelConfig: model_name_or_path: str, config_json_file: str = "config.json", dynamic_load_weight: bool = False, - load_strategy: str = "meta", + load_strategy: str = "ipc_snapshot", quantization: str = None, download_dir: Optional[str] = None): """ diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 5b9b72656..2494f298a 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -140,7 +140,7 @@ class FusedMoE(nn.Layer): shape=gate_weight_shape, dtype="float32", ) - if self.model_config.moe_use_aux_free: + if self.fd_config.model_config.moe_use_aux_free: self.gate_correction_bias = self.create_parameter( shape=gate_correction_bias_shape, dtype="float32", diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py index c8e8ef2b0..8c70c146c 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py @@ -519,43 +519,6 @@ class DFNRopeVisionTransformerPretrainedModel(PretrainedModel): """ return self.blocks[0].mlp.fc2.weight.dtype - def get_name_mappings_to_training(self, ): - """ get_name_mappings_to_training """ - infer_to_train = {} - - # vit train names - vit_names = [ - "vision_model.patch_embed.proj.weight", "vision_model.ln.weight", - "vision_model.ln.bias" - ] - - vit_layer = 32 - for layer_idx in range(vit_layer): - vit_names.append(f"vision_model.blocks.{layer_idx}.norm1.weight") - vit_names.append(f"vision_model.blocks.{layer_idx}.norm1.bias") - - vit_names.append(f"vision_model.blocks.{layer_idx}.norm2.weight") - vit_names.append(f"vision_model.blocks.{layer_idx}.norm2.bias") - - vit_names.append( - f"vision_model.blocks.{layer_idx}.attn.qkv.weight") - vit_names.append(f"vision_model.blocks.{layer_idx}.attn.qkv.bias") - - vit_names.append( - f"vision_model.blocks.{layer_idx}.attn.proj.weight") - vit_names.append(f"vision_model.blocks.{layer_idx}.attn.proj.bias") - - vit_names.append(f"vision_model.blocks.{layer_idx}.mlp.fc1.weight") - vit_names.append(f"vision_model.blocks.{layer_idx}.mlp.fc1.bias") - - vit_names.append(f"vision_model.blocks.{layer_idx}.mlp.fc2.weight") - vit_names.append(f"vision_model.blocks.{layer_idx}.mlp.fc2.bias") - - for train_name in vit_names: - infer_to_train[train_name] = train_name - - return infer_to_train - def rot_pos_emb(self, grid_thw, num_pad=0): """_summary_ diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index f592c8abb..d9503bbe2 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -513,7 +513,7 @@ class Ernie4_5_VLMoeForConditionalGeneration(ModelForCasualLM): model_config.spatial_conv_size, model_config.temporal_conv_size, config=model_config, - prefix_name="ernie.resampler_model", + prefix_name="resampler_model", ) resampler_model = paddle.amp.decorate( models=resampler_model, level="O2", dtype="bfloat16" diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py b/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py index 830abc2a4..c87e6db78 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/modeling_resampler.py @@ -210,31 +210,6 @@ class VariableResolutionResamplerModel(nn.Layer): mark_as_sequence_parallel_parameter(self.mlp.bias) mark_as_sequence_parallel_parameter(self.after_norm.weight) - def get_name_mappings_to_training(self, ): - """ get_name_mappings_to_training """ - infer_to_train = {} - resampler_names = [ - "ernie.resampler_model.spatial_linear.0.weight", - "ernie.resampler_model.spatial_linear.0.bias", - "ernie.resampler_model.spatial_linear.2.weight", - "ernie.resampler_model.spatial_linear.2.bias", - "ernie.resampler_model.spatial_linear.3.weight", - "ernie.resampler_model.spatial_linear.3.bias", - "ernie.resampler_model.temporal_linear.0.weight", - "ernie.resampler_model.temporal_linear.0.bias", - "ernie.resampler_model.temporal_linear.2.weight", - "ernie.resampler_model.temporal_linear.2.bias", - "ernie.resampler_model.temporal_linear.3.weight", - "ernie.resampler_model.temporal_linear.3.bias", - "ernie.resampler_model.mlp.weight", - "ernie.resampler_model.mlp.bias", - "ernie.resampler_model.after_norm.weight", - ] - for train_name in resampler_names: - infer_to_train[train_name[len("ernie."):]] = train_name - - return infer_to_train - def spatial_conv_reshape(self, x, spatial_conv_size): """ Linear 前的 reshape,为了让 Linear 能模仿 conv 的感受野 @@ -376,9 +351,11 @@ class VariableResolutionResamplerModel(nn.Layer): for param_name, param in params_dict.items(): state_dict_key = f"{self.prefix_name}.{param_name}" if state_dict_key not in state_dict: - raise ValueError( - f"The key {state_dict_key} does not exist in state_dict. " - ) + state_dict_key = f"ernie.{self.prefix_name}.{param_name}" + if state_dict_key not in state_dict: + raise ValueError( + f"The key {state_dict_key} does not exist in state_dict. " + ) tensor = get_tensor(state_dict.pop(state_dict_key)) if param.shape != tensor.shape: raise ValueError( diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index e2fbcb6c8..1040221e4 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -16,7 +16,7 @@ import os import time from multiprocessing.shared_memory import SharedMemory -from typing import Any, Dict, List +from typing import Any, Dict import numpy as np import paddle @@ -24,9 +24,6 @@ from paddle import nn from paddleformers.utils.log import logger from fastdeploy.config import FDConfig -from fastdeploy.model_executor.load_weight_utils import \ - load_composite_checkpoint -from fastdeploy.model_executor.model_loader import MODEL_CLASSES class DynamicWeightManager: @@ -43,11 +40,9 @@ class DynamicWeightManager: self.meta_src_id = self._get_gpu_id() self.first_load = True self.ipc_path = f"/shared_ipc_meta/ipc_metas_{self.meta_src_id}" - self.models: List[nn.Layer] = [model] + self.model: nn.Layer = model self._capture_model_state() - - if self.load_config.load_strategy != "meta": - self.update_parameters() + self.update_parameters() logger.info( f"✅ DynamicLoad model built successfully by {self.load_config.load_strategy}, " @@ -56,17 +51,11 @@ class DynamicWeightManager: @paddle.no_grad() def _capture_model_state(self): """Capture and store initial model parameters state.""" - for model in self.models: - for name, param in model.state_dict().items(): - logger.debug( - f"Model param: {name}, shape={param.shape}, dtype={param.dtype}" - ) - self.state_dict[name] = param - - def add_model(self, model: nn.Layer): - """"add model""" - self.models.append(model) - self._capture_model_state() + for name, param in self.model.state_dict().items(): + logger.debug( + f"Model param: {name}, shape={param.shape}, dtype={param.dtype}" + ) + self.state_dict[name] = param def update_parameters(self, pid: int = 0) -> None: """Core method to update model parameters based on strategy.""" @@ -79,8 +68,6 @@ class DynamicWeightManager: strategy_handlers = { "ipc_snapshot": self._update_ipc_snapshot, "ipc": self._update_ipc, - "ipc_no_reshard": self._update_ipc_no_reshard, - "normal": self.load_model, } if handler := strategy_handlers.get(self.load_config.load_strategy): @@ -106,13 +93,7 @@ class DynamicWeightManager: fallback_path = f"/shared_ipc_meta/model_state.tp0{self.meta_src_id}.pdparams" ipc_state_dict = paddle.load(fallback_path) - try: - self._update_model_from_state(ipc_state_dict, "snapshot") - except Exception: - self.models[0].set_state_dict(ipc_state_dict) - logger.warning( - "load model from no_reshard weight, maybe need more GPU memory" - ) + self._update_model_from_state(ipc_state_dict, "snapshot") logger.info( f"IPC snapshot update parameters completed from {model_path}") @@ -124,34 +105,12 @@ class DynamicWeightManager: logger.info( f"IPC update parameters completed from file: {self.ipc_path}") - def _update_ipc_no_reshard(self): - """Update using no-reshard IPC strategy (faster but uses more memory).""" - ipc_meta = paddle.load(self.ipc_path) - state_dict = self._convert_ipc_meta_to_tensor(ipc_meta) - self.models[0].set_state_dict(state_dict) - logger.info( - f"IPC no-reshard update parameters completed from file: {self.ipc_path}" - ) - - def load_model(self) -> nn.Layer: - """Standard model loading without IPC.""" - architectures = self.fd_config.model_config.architectures[0] - model_class = MODEL_CLASSES[architectures] - state_dict = load_composite_checkpoint( - self.fd_config.parallel_config.model_name_or_path, - model_class, - self.fd_config.model_config, - return_numpy=True) - self.models[0].set_state_dict(state_dict) - logger.info("normal load update parameters completed") - def clear_parameters(self, pid: int = 0) -> None: """Clear all model parameters and free memory.""" logger.info("start clear paramaters") paddle.device.cuda.empty_cache() - for model in self.models: - for param in model.state_dict().values(): - param._clear_data() + for param in self.model.state_dict().values(): + param._clear_data() self._verify_parameters("clearance") if self.nranks > 1: diff --git a/fastdeploy/rl/rollout_config.py b/fastdeploy/rl/rollout_config.py index a662d130a..ac67d02f0 100644 --- a/fastdeploy/rl/rollout_config.py +++ b/fastdeploy/rl/rollout_config.py @@ -14,6 +14,7 @@ # limitations under the License. """ + from fastdeploy.worker.worker_process import initialize_fd_config @@ -24,7 +25,7 @@ class RolloutModelConfig: max_model_len: int = 32768, tensor_parallel_size: int = 4, dynamic_load_weight: bool = True, - load_strategy: str = "meta", + load_strategy: str = "ipc_snapshot", enable_mm: bool = False, # Default values for all other parameters max_num_seqs: int = 34, diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index a160138f3..4aa420a5e 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -535,14 +535,11 @@ def parse_args(): parser.add_argument( "--load_strategy", type=str, - choices=['ipc', 'ipc_no_reshard', 'ipc_snapshot', 'meta', 'normal'], - default='meta', + choices=['ipc', 'ipc_snapshot'], + default="ipc_snapshot", help="Weight loading method when dynamic loading is enabled: " "'ipc': real-time IPC streaming with automatic resharding, " - "'ipc_no_reshard': IPC streaming without weight processing, " - "'ipc_snapshot': load from disk snapshot of IPC weights, " - "'meta': provide RL traing worker, no_weights_load" - "'normal':normal load weight") + "'ipc_snapshot': load from disk snapshot of IPC weights.") parser.add_argument("--enable_mm", type=str, default="false",