[vl]remove duplicated load logic (#2744)
Some checks failed
Deploy GitHub Pages / deploy (push) Has been cancelled

This commit is contained in:
bukejiyu
2025-07-13 07:36:26 +08:00
committed by GitHub
parent 16940822a7
commit bad53c6b6e
11 changed files with 510 additions and 632 deletions

View File

@@ -62,6 +62,27 @@ def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase:
from fastdeploy.worker.gcu_worker import GcuWorker
return GcuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
def init_distributed_environment(seed: int = 20) -> List[int]:
""" Initialize Paddle Fleet and get rank of worker """
# Global rank
ranks = dist.get_world_size()
dist_strategy = fleet.DistributedStrategy()
dist_strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": ranks,
"pp_degree": 1,
"sharding_degree": 1,
}
# Set control in tensor parallel
dist_strategy.tensor_parallel_configs = {"tensor_init_seed": seed}
fleet.init(is_collective=True, strategy=dist_strategy)
# Local rank
local_rank = fleet.worker_index()
return ranks, local_rank
class PaddleDisWorkerProc():
"""
@@ -74,6 +95,8 @@ class PaddleDisWorkerProc():
def __init__(
self,
fd_config: FDConfig,
ranks: int = 1,
local_rank: int = 0
) -> None:
"""
Initialize a distributed worker and task queue for single-node multi-GPU setup.
@@ -82,34 +105,11 @@ class PaddleDisWorkerProc():
attributes such as weight_dtype, act_dtype, mp_size, hidden_size, head_dim,
num_attention_heads, and ffn_hidden_size.
"""
self.ranks = ranks
self.local_rank = local_rank
self.fd_config = fd_config
self.parallel_config = fd_config.parallel_config
# Initialize distributed enviroment
(self.ranks, self.local_rank) = self.init_distributed_enviroment()
assert self.parallel_config.tensor_parallel_degree * self.parallel_config.expert_parallel_degree == self.ranks
self.fd_config.parallel_config.tensor_parallel_rank = \
self.local_rank % self.parallel_config.tensor_parallel_degree
self.fd_config.parallel_config.expert_parallel_rank = \
int(self.local_rank / self.parallel_config.tensor_parallel_degree)
if self.fd_config.parallel_config.use_ep:
self.fd_config.moe_config.num_experts_per_rank = \
self.fd_config.moe_config.num_experts // self.parallel_config.expert_parallel_degree
self.fd_config.moe_config.num_experts_start_offset = \
self.fd_config.parallel_config.expert_parallel_rank * self.fd_config.moe_config.num_experts_per_rank
# For auto TP split
self.fd_config.model_config.tensor_parallel_degree = self.parallel_config.tensor_parallel_degree
self.fd_config.model_config.tensor_parallel_rank = self.parallel_config.tensor_parallel_rank
self.fd_config.model_config.use_ep = self.parallel_config.use_ep
if self.fd_config.parallel_config.use_ep:
self.fd_config.model_config.num_experts_per_rank = self.fd_config.moe_config.num_experts_per_rank
self.fd_config.model_config.num_experts_start_offset = self.fd_config.moe_config.num_experts_start_offset
# TODO(gongshaotian): Use worker factory to get worker
self.worker = get_worker(fd_config=fd_config,
local_rank=self.local_rank,
@@ -124,8 +124,7 @@ class PaddleDisWorkerProc():
is_server=False,
num_client=self.parallel_config.tensor_parallel_degree,
client_id=self.parallel_config.tensor_parallel_rank,
local_data_parallel_id=self.fd_config.parallel_config.
expert_parallel_rank)
local_data_parallel_id=self.parallel_config.expert_parallel_rank)
def init_health_status(self) -> None:
"""
@@ -312,27 +311,6 @@ class PaddleDisWorkerProc():
self.exist_prefill_task_signal.value[
0] = self.worker.prefill_finished()
def init_distributed_enviroment(self, seed: int = 20) -> List[int]:
""" Initialize Paddle Fleet and get rank of worker """
# Global rank
self.ranks = dist.get_world_size()
dist_strategy = fleet.DistributedStrategy()
dist_strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": self.ranks,
"pp_degree": 1,
"sharding_degree": 1,
}
# Set control in tensor parallel
dist_strategy.tensor_parallel_configs = {"tensor_init_seed": seed}
fleet.init(is_collective=True, strategy=dist_strategy)
# Local rank
self.local_rank = fleet.worker_index()
return self.ranks, self.local_rank
def determine_num_available_blocks(self) -> None:
"""Profiles the peak memory usage of the model to determine how many
@@ -581,7 +559,7 @@ def parse_args():
return args
def initialize_fd_config(config_or_args) -> FDConfig:
def initialize_fd_config(config_or_args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
"""Initialize FDConfig from either RolloutModelConfig or argparse.Namespace
Args:
@@ -593,14 +571,12 @@ def initialize_fd_config(config_or_args) -> FDConfig:
# Get model config from model directory
model_config_dict, _ = ModelConfig.get_config_dict(config_or_args.model_name_or_path)
# Handle MoE related configs
if 'num_experts' in model_config_dict:
model_config_dict['moe_num_experts'] = model_config_dict.pop('num_experts')
if 'num_experts_per_tok' in model_config_dict:
model_config_dict['moe_topk'] = model_config_dict.pop('num_experts_per_tok')
# Set default values for model config
model_config_dict["head_dim"] = model_config_dict.get(
"head_dim", model_config_dict["hidden_size"] // model_config_dict["num_attention_heads"])
@@ -661,7 +637,6 @@ def initialize_fd_config(config_or_args) -> FDConfig:
parallel_config.kv_cache_ratio = getattr(config_or_args, 'kv_cache_ratio', 1.0)
parallel_config.first_token_id = getattr(config_or_args, 'first_token_id', None)
parallel_config.gpu_memory_utilization = getattr(config_or_args, 'gpu_memory_utilization', 0.9)
parallel_config.engine_pid = getattr(config_or_args, 'engine_pid', None)
parallel_config.do_profile = getattr(config_or_args, 'do_profile', False)
parallel_config.dynamic_load_weight = getattr(config_or_args, 'dynamic_load_weight', False)
parallel_config.pad_token_id = getattr(config_or_args, 'pad_token_id', None)
@@ -719,7 +694,17 @@ def initialize_fd_config(config_or_args) -> FDConfig:
if num_layers is None:
raise ValueError(f"num_layers<{num_layers}> is invalid")
use_moe = model_config_dict.get("moe_layer_start_index", num_layers) < num_layers
if "moe_layer_start_index" in model_config_dict:
moe_layer_start_index = model_config_dict["moe_layer_start_index"]
use_moe = (
isinstance(moe_layer_start_index, int)
and moe_layer_start_index < num_layers
) or (
isinstance(moe_layer_start_index, list)
and min(moe_layer_start_index) < num_layers
)
else:
use_moe = False
# Update model config
model_config.ffn_hidden_size = ffn_hidden_size
@@ -749,6 +734,28 @@ def initialize_fd_config(config_or_args) -> FDConfig:
model_config.deepseekv3 = AutoConfig.from_pretrained(
config_or_args.model_name_or_path)
assert parallel_config.tensor_parallel_degree * parallel_config.expert_parallel_degree == ranks
parallel_config.tensor_parallel_rank = \
local_rank % parallel_config.tensor_parallel_degree
parallel_config.expert_parallel_rank = \
int(local_rank / parallel_config.tensor_parallel_degree)
if parallel_config.use_ep:
moe_config.num_experts_per_rank = \
moe_config.num_experts // parallel_config.expert_parallel_degree
moe_config.num_experts_start_offset = \
parallel_config.expert_parallel_rank * moe_config.num_experts_per_rank
# For auto TP split
model_config.tensor_parallel_degree = parallel_config.tensor_parallel_degree
model_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank
model_config.use_ep = parallel_config.use_ep
if parallel_config.use_ep:
model_config.num_experts_per_rank = moe_config.num_experts_per_rank
model_config.num_experts_start_offset = moe_config.num_experts_start_offset
# Handle quantization config
quantization_config = model_config_dict.get("quantization_config", None)
if not model_config.is_quantized:
@@ -771,7 +778,9 @@ def initialize_fd_config(config_or_args) -> FDConfig:
quantization_config["quantization"] = quant_config_name
# Special handling for Ernie models
is_ernie = "Ernie4_5_ForCausalLM" in model_config_dict.get("architectures", []) or \
"Ernie4_5_MoeForCausalLM" in model_config_dict.get("architectures", [])
"Ernie4_5_MoeForCausalLM" in model_config_dict.get("architectures", []) or \
"Ernie4_5_VLMoeForConditionalGeneration" in model_config_dict.get(
"architectures", [])
if use_moe and quant_config_name == "wint4" and is_ernie:
quantization_config["dense_quant_type"] = "wint8"
quantization_config["moe_quant_type"] = "wint4"
@@ -840,11 +849,13 @@ def run_worker_proc() -> None:
# Get args form Engine
args = parse_args()
ranks, local_rank = init_distributed_environment()
# Get fd_config
fd_config = initialize_fd_config(args)
fd_config = initialize_fd_config(args, ranks, local_rank)
# Create worker process
worker_proc = PaddleDisWorkerProc(fd_config)
worker_proc = PaddleDisWorkerProc(fd_config, ranks, local_rank)
# Initialize device and create model runner
worker_proc.init_device()