Files
FastDeploy/fastdeploy/eplb/async_expert_loader.py
Yuanle Liu ef5aa5c03b [BugFix] fix cuda-python requirement (#5261)
* fix cuda-python requirement

* update

* fix
2025-11-27 13:58:18 +08:00

437 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import ctypes
import os
import time
import traceback
from typing import List, Tuple
import numpy as np
import paddle
try:
from cuda import cudart
except ImportError:
cudart = None
from fastdeploy.config import EPLBConfig
REARRANGE_EXPERT_MAGIC_NUM = 147183647
REARRANGE_ORIGINATOR_EP_RANK = 0
CHECK_TIME_INTERNAL = 3
HTTP_RETRY_NUM = 5
CHECK_TIMEOUT = 120
libc = ctypes.CDLL(None)
libc.mmap.argtypes = [
ctypes.c_void_p, # void *addr
ctypes.c_size_t, # size_t length
ctypes.c_int, # int prot
ctypes.c_int, # int flags
ctypes.c_int, # int fd
ctypes.c_size_t, # off_t offset
]
libc.mmap.restype = ctypes.c_void_p
libc.munmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
libc.munmap.restype = ctypes.c_int
PROT_READ = 0x1
PROT_WRITE = 0x2
MAP_SHARED = 0x01
MAP_ANONYMOUS = 0x20
MAP_FAILED = -1
G = 1024**3
TOTAL_MODEL_SIZE = 350
MAIN_MODEL_REDUNDANT_SHM_SIZE = 5
MODEL_MAIN_NAME = "eplb_main"
def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, eplb_config: EPLBConfig, logger=None):
"""create_mmap"""
flags = MAP_SHARED
prot = PROT_READ | PROT_WRITE
main_size = 0
if eplb_config.redundant_expert_async_load_model_shmem_size_gb == 0:
main_size = TOTAL_MODEL_SIZE // ep_size
else:
main_size = eplb_config.redundant_expert_async_load_model_shmem_size_gb
main_size = main_size * G
mmap_infos = {}
for name in model_name:
expert_weight_file = f"/dev/shm/{name}_rank_{ep_rank}_expert_weight_{shm_uuid}"
shm_size = main_size
if not os.path.isfile(expert_weight_file):
open(expert_weight_file, "wb").close()
shm_fd = os.open(expert_weight_file, os.O_RDWR)
os.ftruncate(shm_fd, shm_size)
if logger is not None:
logger.info(f"redundant_expert: create_mmap file {expert_weight_file}, fd {shm_fd}, size {shm_size}")
shm_ptr = libc.mmap(0, ctypes.c_size_t(shm_size), prot, flags, shm_fd, 0)
if shm_ptr == MAP_FAILED:
raise OSError(f"redundant_expert: mmap {expert_weight_file} failed: {ctypes.get_errno()}")
shm_ptr = ctypes.cast(shm_ptr, ctypes.POINTER(ctypes.c_int8))
addr = ctypes.addressof(shm_ptr.contents)
if cudart is None:
raise ImportError(
"cuda-python not installed. Install the version matching your CUDA toolkit:\n"
" CUDA 12.x → pip install cuda-python==12.*\n"
)
# Register memory with CUDA
(ret,) = cudart.cudaHostRegister(addr, shm_size, 0)
if ret != cudart.cudaError_t.cudaSuccess:
raise RuntimeError(
f"cudaHostRegister failed: {cudart.cudaGetErrorString(ret)}, "
f" address {hex(addr)} size {shm_size}, ret: {ret}"
)
mmap_infos[name] = shm_ptr
return mmap_infos
def save_tensor_to_shm_mem(cached_weights, file_path, logger=None):
"""save_tensor_to_shm_mem"""
tensor_infos = []
offset = 0
if not os.path.exists(file_path):
raise OSError("File is not exist")
shm_size = os.path.getsize(file_path)
for name, w in cached_weights:
size = w.numel().item() * w.element_size()
# logger.info(f"redundant_expert: save tensor to {name} offset: {offset} size: {size}")
w_ptr = ctypes.string_at(w.data_ptr(), size)
with open(file_path, "r+b") as file:
file.seek(offset)
if offset + size > shm_size:
raise IOError(
f"redundant_expert: Exceeded {file_path} file's size. "
+ "Should set a bigger value using env variable."
)
n = file.write(w_ptr)
assert n == size
tensor_infos.append((name, offset, size, w.shape, w.dtype))
offset += size
sz = offset / 1024 / 1024 / 1024
if logger is not None:
logger.info(f"redundant_expert: save_tensor_to_shm_mem success. file {file_path} size {sz}G")
return tensor_infos
def load_tensor_from_shm_mem(tensor_infos, shm_ptr, logger=None):
"""load_tensor_from_shm_mem"""
# weights_dict = {}
weights_dict = []
for name, offset, size, shape, dtype in tensor_infos:
# 计算共享内存中张量的地址
w_addr = ctypes.cast(shm_ptr, ctypes.c_void_p).value + offset
w_ptr = ctypes.cast(w_addr, ctypes.POINTER(ctypes.c_byte))
# 先读取为字节数组,再通过视图转换成适当类型
np_array = np.ctypeslib.as_array(w_ptr, shape=(size,))
if dtype == paddle.float32:
tmp = np_array.view(np.float32)
tensor = paddle.Tensor(tmp, dtype=paddle.float32, place=paddle.CPUPlace(), zero_copy=True)
elif dtype == paddle.uint8:
tmp = np_array.view(np.uint8)
tensor = paddle.Tensor(tmp, dtype=paddle.uint8, place=paddle.CPUPlace(), zero_copy=True)
elif dtype == paddle.int8:
tmp = np_array.view(np.int8)
tensor = paddle.Tensor(tmp, dtype=paddle.int8, place=paddle.CPUPlace(), zero_copy=True)
elif dtype == paddle.bfloat16:
# NumPy 不支持 bfloat16因此先以 uint16 读取原始数据,再用 Paddle cast 为 bfloat16
tmp = np_array.view(np.uint16)
tensor = paddle.Tensor(tmp, dtype=paddle.bfloat16, place=paddle.CPUPlace(), zero_copy=True)
elif dtype == paddle.float8_e4m3fn:
tmp = np_array.view(np.uint8)
tensor = paddle.Tensor(tmp, dtype=paddle.float8_e4m3fn, place=paddle.CPUPlace(), zero_copy=True)
else:
raise TypeError(f"Unsupported dtype: {dtype}")
assert w_addr == tensor.data_ptr()
# weights_dict[name] = tensor.view(shape)
weights_dict.append((name, tensor.view(shape)))
if logger is not None:
logger.info("redundant_expert: load_tensor_from_shm_mem succ")
return weights_dict
class AsyncEPLoader(object):
"""Aynsc Expert loader"""
def __init__(
self,
model_dir,
eplb_config,
rank=8,
expert_per_rank=8,
moe_layer_start_index=3,
moe_quant_type="",
logger=None,
):
"""
__init__
"""
self.model_path = model_dir
self.eplb_config = eplb_config
self.expert_per_rank = expert_per_rank
self.moe_layer_start_index = moe_layer_start_index
self.ep_rank = rank
self.moe_quant_type = moe_quant_type
self.old_model_ep_rank_to_expert_id_list = None
self.new_model_ep_rank_to_expert_id_list = None
self.cached_weights = []
# self.state_dicts = {}
self.moe_file_names = []
self.logger = logger
def reset(self):
"""
reset
"""
self.old_model_ep_rank_to_expert_id_list = None
self.new_model_ep_rank_to_expert_id_list = None
self.cached_weights = []
self.moe_file_names = []
def load_experts_weight_from_disk(self):
"""
return value: (all_succ whether_load_weight exist_fatal_error message),
exist_fatal_error means all rank need restart
"""
ep_rank = self.ep_rank
start_idx = ep_rank * self.expert_per_rank
end_idx = start_idx + self.expert_per_rank
try:
old_expert_ids_all = self.old_model_ep_rank_to_expert_id_list[:, start_idx:end_idx]
new_expert_ids_all = self.new_model_ep_rank_to_expert_id_list[:, start_idx:end_idx]
need_to_reload = list()
for layer_id in range(len(old_expert_ids_all)):
if layer_id < self.moe_layer_start_index:
continue
new_expert_ids = new_expert_ids_all[layer_id]
old_expert_ids = old_expert_ids_all[layer_id]
if len(new_expert_ids) != len(old_expert_ids):
message = f"redundant_expert: new_expert_ids length not equal to old_expert_ids \
length layer_id: {layer_id}"
# this is very dangerous and unepxpected, should be fixed
return False, message
# TODO: 按需加载,过滤重复专家
self.logger.info(
f"redundant_expert: rank {ep_rank} layer {layer_id} old_experts {old_expert_ids}"
+ f" new_experts {new_expert_ids}"
)
need_to_reload.extend([(layer_id, expert_id) for expert_id in new_expert_ids])
succ = True
message = ""
if len(need_to_reload) > 0:
if self.eplb_config.model_use_safetensors:
succ, message = self.load_safetensor_fp8_from_disk(need_to_reload)
else:
succ, message = self.load_weight_bf16_from_disk(need_to_reload)
if not succ:
self.logger.info(
f"redundant_expert: load_experts_weight_from_disk fail. rank {ep_rank}, error: {message}"
)
new_message = f"redundant_expert: load_experts_weight_from_disk fail. rank {ep_rank}, error: {message}"
return False, new_message
self.logger.info(f"redundant_expert: load_experts_weight_from_disk success. rank {ep_rank}")
return True, "redundant_expert: load_experts_weight_from_disk success"
except Exception as e:
message = f"redundant_expert: Failed to load_experts_weight_from_disk ep_rank {ep_rank} excep: {e}"
error_message = traceback.format_exc()
self.logger.error(f"redundant_expert: message: {message} traceback: {error_message}")
return False, message
def load_weight_bf16_from_disk(self, need_to_reload: List[Tuple[int, int]]):
"""load_weight_bf16_from_disk"""
try:
ckpt_up_gate_proj_name = "up_gate_proj"
ckpt_down_proj_name = "down_proj"
for layer_id, expert_id in need_to_reload:
for weight_name in [ckpt_up_gate_proj_name, ckpt_down_proj_name]:
ckpt_file_name = f"ernie.layers.{layer_id}.mlp.experts.{expert_id}.{weight_name}.weight"
if ckpt_file_name not in self.moe_file_names:
self.logger.info(f"record redundant_expert: {ckpt_file_name}")
self.moe_file_names.append(ckpt_file_name)
last_device = paddle.device.get_device()
paddle.set_device("cpu")
for file_name in self.moe_file_names:
# 判断文件是否存在
if not os.path.exists(self.model_path + "/merged_tp1_state_split/" + file_name):
# self.logger.info(f"redundant_expert: {file_name} not exist.")
continue
# self.logger.info(f"redundant_expert: Loading expert weights: {file_name}.")
# self.state_dicts[file_name] = paddle.load(self.model_path + "/merged_tp1_state_split/" + file_name)
paddle.set_device(last_device)
self.logger.info("redundant_expert: Loading expert weights end.")
return True, "redundant_expert: Succeeded to loading expert weights."
except Exception as e:
message = f"redundant_expert: Failed to get weights iterator: {e}."
return False, message
def load_safetensor_fp8_from_disk(self, need_to_reload: List[Tuple[int, int]]):
"""load_safetensor_fp8_from_disk"""
"""
ernie.layers.52.mlp.experts.58.up_gate_proj.quant_weight
ernie.layers.52.mlp.experts.58.up_gate_proj.weight_scale
ernie.layers.52.mlp.experts.58.down_proj.quant_weight
ernie.layers.52.mlp.experts.58.down_proj.weight_scale
"""
up_gate_down = ["up_gate_proj", "down_proj"]
quant_weight_scale = ["quant_weight", "weight_scale"]
ckpt_name = [
(f"ernie.layers.{layer_id}.mlp.experts.{expert_id}.{proj_name}.{quant_name}")
for layer_id, expert_id in need_to_reload
for proj_name in up_gate_down
for quant_name in quant_weight_scale
]
ckpt_name_to_safetensor_file = load_ep_checkpoint(self.model_path)
hf_weights_files = list(set(ckpt_name_to_safetensor_file.values()))
state_dicts = {}
last_device = paddle.device.get_device()
paddle.set_device("cpu")
from safetensors import safe_open
for st_file in hf_weights_files:
with safe_open(st_file, framework="paddle", device="cpu") as f:
for name in f.keys():
if name in ckpt_name:
weight = f.get_tensor(name)
state_dicts[name] = paddle.Tensor(weight, zero_copy=True)
weights_list = []
for name in ckpt_name:
weights_list.append((name, state_dicts[name]))
self.cached_weights = weights_list
paddle.set_device(last_device)
return True, "load_expert_weight_from_disk_safetensor success"
def load_ep_checkpoint(model_path):
"""
load ep checkpoint
"""
file_path = os.path.join(model_path, "model.safetensors.index.json")
if not os.path.exists(file_path):
return {}
import json
with open(file_path, "r") as f:
weight_map = json.load(f)["weight_map"]
state_dict = {k: os.path.join(model_path, v) for k, v in weight_map.items()}
return state_dict
def load_model_weights_process(
rank: int,
model_dir: str,
expert_per_rank: int,
moe_layer_start_index: int,
moe_quant_type: str,
shm_uuid: str,
eplb_config: EPLBConfig,
data_conn,
mg_conn,
):
"""
load_model_weights_process
"""
import faulthandler
from setproctitle import setproctitle
setproctitle(f"eplb::async_load_model_{rank}")
faulthandler.enable()
from fastdeploy.utils import get_logger
logger = get_logger("eplb_async_loader", "eplb_{0}.log".format(rank))
logger.info("redundant_expert: load_model_weights_process start")
paddle.set_device("cpu")
ep_loader = AsyncEPLoader(
model_dir=model_dir,
rank=rank,
expert_per_rank=expert_per_rank,
moe_layer_start_index=moe_layer_start_index,
moe_quant_type=moe_quant_type,
logger=logger,
eplb_config=eplb_config,
)
while True:
ep_loader.reset()
data = mg_conn.recv()
result = True
weight_infos = []
try:
ep_loader.old_model_ep_rank_to_expert_id_list = data["old_model_ep_rank_to_expert_id_list"]
ep_loader.new_model_ep_rank_to_expert_id_list = data["new_model_ep_rank_to_expert_id_list"]
begin_time_disk = int(time.time())
success, message = ep_loader.load_experts_weight_from_disk()
begin_time_shm = int(time.time())
logger.info(
"redundant_expert: async load load_weight_from_disk, "
+ f"succ {success}, cost {begin_time_shm-begin_time_disk}s"
)
if success:
model_name = MODEL_MAIN_NAME
file_path = f"/dev/shm/{model_name}_rank_{rank}_expert_weight_{shm_uuid}"
weight_infos = save_tensor_to_shm_mem(ep_loader.cached_weights, file_path, logger)
logger.info(
"redundant_expert: async load save_tensor_to_shm_mem, "
+ f"tensor nums {len(weight_infos)}, cost {int(time.time()-begin_time_shm)}s"
)
else:
logger.error(f"redundant_expert: async load load_weight_from_disk failed, error {message}")
result = False
except Exception as e:
logger.error(f"redundant_expert: async load weights failed, rank {rank} error {e}")
result = False
weight_infos = []
finally:
request_data = {"result": result, "weights": weight_infos}
data_conn.send(request_data)