[Feature] Support eplb for fd (#4599)

* support eplb

* support eplb

---------

Co-authored-by: kevin <chengyf112@gmail.com>
This commit is contained in:
chenjian
2025-11-03 14:08:15 +08:00
committed by GitHub
parent c657f8d16a
commit f83d0cf127
9 changed files with 1527 additions and 0 deletions

View File

@@ -1118,6 +1118,29 @@ class PoolerConfig:
"""
class EPLBConfig:
"""
Configuration for EPLB manager.
"""
def __init__(
self,
):
self.enable_redundant_experts = envs.FD_ENABLE_REDUNDANT_EXPERTS
self.redundant_experts_num = envs.FD_REDUNDANT_EXPERTS_NUM
self.redundant_expert_ip_shm_size = envs.FD_REDUNDANT_EXPERT_IP_SHM_SIZE
self.redundant_expert_meta_dir = envs.FD_REDUNDANT_EXPERT_META_DIR
self.redundant_expert_api_user = envs.FD_REDUNDANT_EXPERT_API_USER
self.redundant_expert_api_password = envs.FD_REDUNDANT_EXPERT_API_PASSWORD
self.redundant_expert_eplb_strategy = envs.FD_REDUNDANT_EXPERT_EPLB_STRATEGY
self.redundant_expert_dump_workload_interval = envs.FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL
self.redundant_expert_async_load_model_shmem_size_gb = envs.FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB
self.redundant_expert_enable_schedule_cordon = envs.FD_REDUNDANT_EXPERT_ENABLE_SCHEDULE_CORDON
self.model_use_safetensors = envs.FD_MODEL_USE_SAFETENSORS
self.model_use_offline_quant = envs.FD_MODEL_USE_OFFLINE_QUANT
self.moe_quant_type = envs.FD_MOE_QUANT_TYPE
class CacheConfig:
"""
Configuration for the KV cache.
@@ -1379,6 +1402,7 @@ class FDConfig:
graph_opt_config: GraphOptimizationConfig = None,
plas_attention_config: PlasAttentionConfig = None,
speculative_config: SpeculativeConfig = None,
eplb_config: EPLBConfig = None,
structured_outputs_config: StructuredOutputsConfig = None,
tokenizer: str = None,
ips: str = None,
@@ -1398,6 +1422,7 @@ class FDConfig:
self.scheduler_config: SchedulerConfig = scheduler_config # type: ignore
self.parallel_config = parallel_config # type: ignore
self.speculative_config: SpeculativeConfig = speculative_config
self.eplb_config: Optional[EPLBConfig] = eplb_config
self.device_config: DeviceConfig = device_config # type: ignore
self.load_config: LoadConfig = load_config
self.quant_config: Optional[QuantConfigBase] = quant_config

View File

@@ -26,6 +26,7 @@ from fastdeploy.config import (
CacheConfig,
ConvertOption,
EarlyStopConfig,
EPLBConfig,
FDConfig,
GraphOptimizationConfig,
LoadConfig,
@@ -1076,6 +1077,8 @@ class EngineArgs:
Create and return a Config object based on the current settings.
"""
all_dict = asdict(self)
eplb_cfg = EPLBConfig()
all_dict["enable_redundant_experts"] = eplb_cfg.enable_redundant_experts
model_cfg = ModelConfig(all_dict)
# XPU currently disable prefix cache for VL model
@@ -1134,6 +1137,7 @@ class EngineArgs:
load_config=load_cfg,
parallel_config=parallel_cfg,
speculative_config=speculative_cfg,
eplb_config=eplb_cfg,
structured_outputs_config=structured_outputs_config,
ips=self.ips,
use_warmup=self.use_warmup,

View File

@@ -128,6 +128,27 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_CACHE_PROC_EXIT_TIMEOUT": lambda: int(os.getenv("FD_CACHE_PROC_EXIT_TIMEOUT", "600")),
# Count for cache_transfer_manager process error
"FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")),
# EPLB related
"FD_ENABLE_REDUNDANT_EXPERTS": lambda: int(os.getenv("FD_ENABLE_REDUNDANT_EXPERTS", "0")) == 1,
"FD_REDUNDANT_EXPERTS_NUM": lambda: int(os.getenv("FD_REDUNDANT_EXPERTS_NUM", "0")),
"FD_REDUNDANT_EXPERT_IP_SHM_SIZE": lambda: int(os.getenv("FD_REDUNDANT_EXPERT_IP_SHM_SIZE", "1024")),
"FD_REDUNDANT_EXPERT_META_DIR": lambda: os.getenv("FD_REDUNDANT_EXPERT_META_DIR", "/tmp/redundant_expert_meta"),
"FD_REDUNDANT_EXPERT_API_USER": lambda: os.getenv("FD_REDUNDANT_EXPERT_API_USER", ""),
"FD_REDUNDANT_EXPERT_API_PASSWORD": lambda: os.getenv("FD_REDUNDANT_EXPERT_API_PASSWORD", ""),
"FD_REDUNDANT_EXPERT_EPLB_STRATEGY": lambda: os.getenv("FD_REDUNDANT_EXPERT_EPLB_STRATEGY", ""),
"FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL": lambda: int(
os.getenv("FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL", "10")
),
"FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB": lambda: int(
os.getenv("FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB", "0")
),
"FD_REDUNDANT_EXPERT_ENABLE_SCHEDULE_CORDON": lambda: int(
os.getenv("FD_REDUNDANT_EXPERT_ENABLE_SCHEDULE_CORDON", "1")
)
== 1,
"FD_MODEL_USE_SAFETENSORS": lambda: int(os.getenv("FD_MODEL_USE_SAFETENSORS", "1")) == 1,
"FD_MODEL_USE_OFFLINE_QUANT": lambda: int(os.getenv("FD_MODEL_USE_OFFLINE_QUANT", "1")) == 1,
"FD_MOE_QUANT_TYPE": lambda: os.getenv("FD_MOE_QUANT_TYPE", "w4a8"),
"ENCODE_FEATURE_BOS_AK": lambda: os.getenv("ENCODE_FEATURE_BOS_AK"),
"ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"),
}

View File

@@ -0,0 +1,3 @@
""" "
Expert Parallelism Load Balancer (EPLB)
"""

View File

@@ -0,0 +1,405 @@
"""AsyncExpertLoader async load the model weights of the MoE experts."""
import ctypes
import os
import time
import traceback
from typing import List, Tuple
import numpy as np
import paddle
from fastdeploy import envs
REARRANGE_EXPERT_MAGIC_NUM = 147183647
REARRANGE_ORIGINATOR_EP_RANK = 0
CHECK_TIME_INTERNAL = 3
HTTP_RETRY_NUM = 5
CHECK_TIMEOUT = 120
libc = ctypes.CDLL(None)
libc.mmap.argtypes = [
ctypes.c_void_p, # void *addr
ctypes.c_size_t, # size_t length
ctypes.c_int, # int prot
ctypes.c_int, # int flags
ctypes.c_int, # int fd
ctypes.c_size_t, # off_t offset
]
libc.mmap.restype = ctypes.c_void_p
libc.munmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
libc.munmap.restype = ctypes.c_int
PROT_READ = 0x1
PROT_WRITE = 0x2
MAP_SHARED = 0x01
MAP_ANONYMOUS = 0x20
MAP_FAILED = -1
G = 1024**3
TOTAL_MODEL_SIZE = 350
MAIN_MODEL_REDUNDANT_SHM_SIZE = 5
MODEL_MAIN_NAME = "eplb_main"
def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, logger=None):
"""create_mmap"""
flags = MAP_SHARED
prot = PROT_READ | PROT_WRITE
main_size = 0
if envs.FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB == 0:
main_size = TOTAL_MODEL_SIZE // ep_size
else:
main_size = envs.FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB
main_size = main_size * G
mmap_infos = {}
from cuda import cudart
for name in model_name:
expert_weight_file = f"/dev/shm/{name}_rank_{ep_rank}_expert_weight_{shm_uuid}"
shm_size = main_size
if not os.path.isfile(expert_weight_file):
open(expert_weight_file, "wb").close()
shm_fd = os.open(expert_weight_file, os.O_RDWR)
os.ftruncate(shm_fd, shm_size)
if logger is not None:
logger.info(
f"redundant_expert: create_mmap file {expert_weight_file}, \
fd {shm_fd}, size {shm_size}"
)
shm_ptr = libc.mmap(0, ctypes.c_size_t(shm_size), prot, flags, shm_fd, 0)
if shm_ptr == MAP_FAILED:
raise OSError(f"redundant_expert: mmap {expert_weight_file} failed: {ctypes.get_errno()}")
shm_ptr = ctypes.cast(shm_ptr, ctypes.POINTER(ctypes.c_int8))
addr = ctypes.addressof(shm_ptr.contents)
# Register memory with CUDA
(ret,) = cudart.cudaHostRegister(addr, shm_size, 0)
if ret != cudart.cudaError_t.cudaSuccess:
raise RuntimeError(
f"cudaHostRegister failed: {cudart.cudaGetErrorString(ret)},"
+ f" address {hex(addr)} size {shm_size}, ret: {ret}"
)
mmap_infos[name] = shm_ptr
return mmap_infos
def save_tensor_to_shm_mem(cached_weights, file_path, logger=None):
"""save_tensor_to_shm_mem"""
tensor_infos = []
offset = 0
if not os.path.exists(file_path):
raise OSError("File is not exist")
shm_size = os.path.getsize(file_path)
for name, w in cached_weights:
size = w.numel().item() * w.element_size()
# logger.info(f"redundant_expert: save tensor to {name} offset: {offset} size: {size}")
w_ptr = ctypes.string_at(w.data_ptr(), size)
with open(file_path, "r+b") as file:
file.seek(offset)
if offset + size > shm_size:
raise IOError(
f"redundant_expert: Exceeded {file_path} file's size. "
+ "Should set a bigger value using env variable."
)
n = file.write(w_ptr)
assert n == size
tensor_infos.append((name, offset, size, w.shape, w.dtype))
offset += size
sz = offset / 1024 / 1024 / 1024
if logger is not None:
logger.info(f"redundant_expert: save_tensor_to_shm_mem success. file {file_path} size {sz}G")
return tensor_infos
def load_tensor_from_shm_mem(tensor_infos, shm_ptr, logger=None):
"""load_tensor_from_shm_mem"""
# weights_dict = {}
weights_dict = []
for name, offset, size, shape, dtype in tensor_infos:
# 计算共享内存中张量的地址
w_addr = ctypes.cast(shm_ptr, ctypes.c_void_p).value + offset
w_ptr = ctypes.cast(w_addr, ctypes.POINTER(ctypes.c_byte))
# 先读取为字节数组,再通过视图转换成适当类型
np_array = np.ctypeslib.as_array(w_ptr, shape=(size,))
if dtype == paddle.float32:
tmp = np_array.view(np.float32)
tensor = paddle.Tensor(tmp, dtype=paddle.float32, place=paddle.CPUPlace(), zero_copy=True)
elif dtype == paddle.uint8:
tmp = np_array.view(np.uint8)
tensor = paddle.Tensor(tmp, dtype=paddle.uint8, place=paddle.CPUPlace(), zero_copy=True)
elif dtype == paddle.int8:
tmp = np_array.view(np.int8)
tensor = paddle.Tensor(tmp, dtype=paddle.int8, place=paddle.CPUPlace(), zero_copy=True)
elif dtype == paddle.bfloat16:
# NumPy 不支持 bfloat16因此先以 uint16 读取原始数据,再用 Paddle cast 为 bfloat16
tmp = np_array.view(np.uint16)
tensor = paddle.Tensor(tmp, dtype=paddle.bfloat16, place=paddle.CPUPlace(), zero_copy=True)
else:
raise TypeError(f"Unsupported dtype: {dtype}")
assert w_addr == tensor.data_ptr()
# weights_dict[name] = tensor.view(shape)
weights_dict.append((name, tensor.view(shape)))
if logger is not None:
logger.info("redundant_expert: load_tensor_from_shm_mem succ")
return weights_dict
class AsyncEPLoader(object):
"""Aynsc Expert loader"""
def __init__(
self,
model_dir,
rank=8,
expert_per_rank=8,
moe_layer_start_index=3,
moe_quant_type="",
logger=None,
):
"""
__init__
"""
self.model_path = model_dir
self.expert_per_rank = expert_per_rank
self.moe_layer_start_index = moe_layer_start_index
self.ep_rank = rank
self.moe_quant_type = moe_quant_type
self.old_model_ep_rank_to_expert_id_list = None
self.new_model_ep_rank_to_expert_id_list = None
self.cached_weights = []
# self.state_dicts = {}
self.moe_file_names = []
self.logger = logger
def reset(self):
"""
reset
"""
self.old_model_ep_rank_to_expert_id_list = None
self.new_model_ep_rank_to_expert_id_list = None
self.cached_weights = []
self.moe_file_names = []
def load_experts_weight_from_disk(self):
"""
return value: (all_succ whether_load_weight exist_fatal_error message),
exist_fatal_error means all rank need restart
"""
ep_rank = self.ep_rank
start_idx = ep_rank * self.expert_per_rank
end_idx = start_idx + self.expert_per_rank
try:
old_expert_ids_all = self.old_model_ep_rank_to_expert_id_list[:, start_idx:end_idx]
new_expert_ids_all = self.new_model_ep_rank_to_expert_id_list[:, start_idx:end_idx]
need_to_reload = list()
for layer_id in range(len(old_expert_ids_all)):
if layer_id < self.moe_layer_start_index:
continue
new_expert_ids = new_expert_ids_all[layer_id]
old_expert_ids = old_expert_ids_all[layer_id]
if len(new_expert_ids) != len(old_expert_ids):
message = f"redundant_expert: new_expert_ids length not equal to old_expert_ids \
length layer_id: {layer_id}"
# this is very dangerous and unepxpected, should be fixed
return False, message
# TODO: 按需加载,过滤重复专家
self.logger.info(
f"redundant_expert: rank {ep_rank} layer {layer_id} old_experts {old_expert_ids}"
+ f" new_experts {new_expert_ids}"
)
need_to_reload.extend([(layer_id, expert_id) for expert_id in new_expert_ids])
succ = True
message = ""
if len(need_to_reload) > 0:
if envs.FD_MODEL_USE_SAFETENSORS:
succ, message = self.load_safetensor_fp8_from_disk(need_to_reload)
else:
succ, message = self.load_weight_bf16_from_disk(need_to_reload)
if not succ:
self.logger.info(
f"redundant_expert: load_experts_weight_from_disk fail. rank {ep_rank}, error: {message}"
)
new_message = f"redundant_expert: load_experts_weight_from_disk fail. rank {ep_rank}, error: {message}"
return False, new_message
self.logger.info(f"redundant_expert: load_experts_weight_from_disk success. rank {ep_rank}")
return True, "redundant_expert: load_experts_weight_from_disk success"
except Exception as e:
message = f"redundant_expert: Failed to load_experts_weight_from_disk ep_rank {ep_rank} excep: {e}"
error_message = traceback.format_exc()
self.logger.error(f"redundant_expert: message: {message} traceback: {error_message}")
return False, message
def load_weight_bf16_from_disk(self, need_to_reload: List[Tuple[int, int]]):
"""load_weight_bf16_from_disk"""
try:
ckpt_up_gate_proj_name = "up_gate_proj"
ckpt_down_proj_name = "down_proj"
for layer_id, expert_id in need_to_reload:
for weight_name in [ckpt_up_gate_proj_name, ckpt_down_proj_name]:
ckpt_file_name = f"ernie.layers.{layer_id}.mlp.experts.{expert_id}.{weight_name}.weight"
if ckpt_file_name not in self.moe_file_names:
self.logger.info(f"record redundant_expert: {ckpt_file_name}")
self.moe_file_names.append(ckpt_file_name)
last_device = paddle.device.get_device()
paddle.set_device("cpu")
for file_name in self.moe_file_names:
# 判断文件是否存在
if not os.path.exists(self.model_path + "/merged_tp1_state_split/" + file_name):
# self.logger.info(f"redundant_expert: {file_name} not exist.")
continue
# self.logger.info(f"redundant_expert: Loading expert weights: {file_name}.")
self.state_dicts[file_name] = paddle.load(self.model_path + "/merged_tp1_state_split/" + file_name)
paddle.set_device(last_device)
self.logger.info("redundant_expert: Loading expert weights end.")
return True, "redundant_expert: Succeeded to loading expert weights."
except Exception as e:
message = f"redundant_expert: Failed to get weights iterator: {e}."
return False, message
def load_safetensor_fp8_from_disk(self, need_to_reload: List[Tuple[int, int]]):
"""load_safetensor_fp8_from_disk"""
"""
ernie.layers.52.mlp.experts.58.up_gate_proj.quant_weight
ernie.layers.52.mlp.experts.58.up_gate_proj.weight_scale
ernie.layers.52.mlp.experts.58.down_proj.quant_weight
ernie.layers.52.mlp.experts.58.down_proj.weight_scale
"""
up_gate_down = ["up_gate_proj", "down_proj"]
quant_weight_scale = ["quant_weight", "weight_scale"]
if self.moe_quant_type == "w4a8":
quant_weight_scale = ["quant_weight"]
ckpt_name = [
(f"ernie.layers.{layer_id}.mlp.experts.{expert_id}.{proj_name}.{quant_name}")
for layer_id, expert_id in need_to_reload
for proj_name in up_gate_down
for quant_name in quant_weight_scale
]
ckpt_name_to_safetensor_file = load_ep_checkpoint(self.model_path)
hf_weights_files = list(set(ckpt_name_to_safetensor_file.values()))
state_dicts = {}
last_device = paddle.device.get_device()
paddle.set_device("cpu")
from safetensors import safe_open
for st_file in hf_weights_files:
with safe_open(st_file, framework="np", device="cpu") as f:
for name in f.keys():
if name in ckpt_name:
weight = f.get_tensor(name)
state_dicts[name] = paddle.Tensor(weight, zero_copy=True)
weights_list = []
for name in ckpt_name:
weights_list.append((name, state_dicts[name]))
self.cached_weights = weights_list
paddle.set_device(last_device)
return True, "load_expert_weight_from_disk_safetensor success"
def load_ep_checkpoint(model_path):
"""
load ep checkpoint
"""
file_path = os.path.join(model_path, "model.safetensors.index.json")
if not os.path.exists(file_path):
return {}
import json
with open(file_path, "r") as f:
weight_map = json.load(f)["weight_map"]
state_dict = {k: os.path.join(model_path, v) for k, v in weight_map.items()}
return state_dict
def load_model_weights_process(
rank: int, expert_per_rank: int, moe_layer_start_index: int, moe_quant_type: str, data_conn, mg_conn, shm_uuid
):
"""
load_model_weights_process
"""
import faulthandler
from setproctitle import setproctitle
setproctitle(f"eplb::async_load_model_{rank}")
faulthandler.enable()
from server.utils import get_logger
logger = get_logger("eplb_async_loader", "eplb_{0}.log".format(rank))
logger.info("redundant_expert: load_model_weights_process start")
paddle.set_device("cpu")
ep_loader = AsyncEPLoader(
rank=rank,
expert_per_rank=expert_per_rank,
moe_layer_start_index=moe_layer_start_index,
moe_quant_type=moe_quant_type,
logger=logger,
)
while True:
ep_loader.reset()
data = mg_conn.recv()
result = True
weight_infos = []
try:
ep_loader.old_model_ep_rank_to_expert_id_list = data["old_model_ep_rank_to_expert_id_list"]
ep_loader.new_model_ep_rank_to_expert_id_list = data["new_model_ep_rank_to_expert_id_list"]
begin_time_disk = int(time.time())
success, message = ep_loader.load_experts_weight_from_disk()
begin_time_shm = int(time.time())
logger.info(
"redundant_expert: async load load_weight_from_disk, "
+ f"succ {success}, cost {begin_time_shm-begin_time_disk}s"
)
if success:
model_name = MODEL_MAIN_NAME
file_path = f"/dev/shm/{model_name}_rank_{rank}_expert_weight_{shm_uuid}"
weight_infos = save_tensor_to_shm_mem(ep_loader.cached_weights, file_path, logger)
logger.info(
"redundant_expert: async load save_tensor_to_shm_mem, "
+ f"tensor nums {len(weight_infos)}, cost {int(time.time()-begin_time_shm)}s"
)
else:
logger.error(f"redundant_expert: async load load_weight_from_disk failed, error {message}")
result = False
except Exception as e:
logger.error(f"redundant_expert: async load weights failed, rank {rank} error {e}")
result = False
weight_infos = []
finally:
request_data = {"result": result, "weights": weight_infos}
data_conn.send(request_data)

288
fastdeploy/eplb/eplb.py Normal file
View File

@@ -0,0 +1,288 @@
"""Expert Parallelism Load Balancer (EPLB)"""
from typing import Tuple
import numpy as np
def balanced_packing(weight: np.ndarray, num_packs: int) -> Tuple[np.ndarray, np.ndarray]:
"""
Pack n weighted objects to m packs, such that each bin contains exactly n/m objects and the weights of all packs
are as balanced as possible.
Parameters:
weight: [X, n], the weight of each item
num_packs: number of packs
Returns:
pack_index: [X, n], the pack index of each item
rank_in_pack: [X, n], the rank of the item in the pack
"""
num_layers, num_groups = weight.shape
assert num_groups % num_packs == 0
groups_per_pack = num_groups // num_packs
if groups_per_pack == 1:
pack_index = np.arange(weight.shape[-1], dtype=np.int32).reshape(1, -1).repeat(num_layers, axis=0)
rank_in_pack = np.zeros_like(weight, dtype=np.int32)
return pack_index, rank_in_pack
indices = np.argsort(-weight.astype(np.float32), axis=-1)
pack_index = np.full_like(weight, fill_value=-1, dtype=np.int32)
rank_in_pack = np.full_like(pack_index, fill_value=-1)
for i in range(num_layers):
pack_weights = [0] * num_packs
pack_items = [0] * num_packs
for group in indices[i]:
pack = min(
(i for i in range(num_packs) if pack_items[i] < groups_per_pack),
key=pack_weights.__getitem__,
)
assert pack_items[pack] < groups_per_pack
pack_index[i, group] = pack
rank_in_pack[i, group] = pack_items[pack]
pack_weights[pack] += weight[i, group]
pack_items[pack] += 1
return pack_index, rank_in_pack
def replicate_experts(weight: np.ndarray, num_phy: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Replicate `num_log` experts to `num_phy` replicas, such that the maximum load of all replicas is minimized.
Parameters:
weight: [X, num_log]
num_phy: total number of experts after replication
Returns:
phy2log: [X, num_phy], logical expert id of each physical expert
rank: [X, num_phy], the replica rank
logcnt: [X, num_log], number of replicas for each logical expert
"""
n, num_log = weight.shape
num_redundant = num_phy - num_log
assert num_redundant >= 0
phy2log = np.arange(num_phy, dtype=np.int32).reshape(1, -1).repeat(n, axis=0)
rank = np.zeros((n, num_phy), dtype=np.int32)
logcnt = np.ones((n, num_log), dtype=np.int32)
arangen = np.arange(n, dtype=np.int32)
for i in range(num_log, num_phy):
redundant_indices = np.argmax(weight / logcnt, axis=-1)
phy2log[:, i] = redundant_indices
rank[:, i] = logcnt[arangen, redundant_indices]
logcnt[arangen, redundant_indices] += 1
return phy2log, rank, logcnt
def rebalance_experts_intra_node(
weight: np.ndarray,
num_physical_experts: int,
num_groups: int,
num_nodes: int,
num_gpus: int,
):
"""
Parameters:
weight: [num_moe_layers, num_logical_experts]
num_physical_experts: number of physical experts after replication
num_groups: number of expert groups
num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster
num_gpus: number of GPUs, must be a multiple of `num_nodes`
Returns:
physical_to_logical_map: [num_moe_layers, num_physical_experts]
logical_to_physical_map: [num_moe_layers, num_logical_experts, X]
logical_count: [num_moe_layers, num_logical_experts]
"""
num_layers, num_logical_experts = weight.shape
assert num_logical_experts % num_groups == 0
num_redundant_experts = num_physical_experts - num_logical_experts
assert num_redundant_experts >= 0
assert num_gpus % num_nodes == 0
num_gpus_per_node = num_gpus // num_nodes
assert num_physical_experts % num_gpus == 0
num_physical_experts_per_gpu = num_physical_experts // num_gpus
assert num_physical_experts % num_nodes == 0
num_physical_experts_per_node = num_physical_experts // num_nodes
assert num_logical_experts % num_physical_experts_per_node == 0
# num_logical_nodes = num_logical_experts // num_physical_experts_per_node
assert num_redundant_experts % num_physical_experts_per_node == 0
# num_redundant_nodes = num_redundant_experts // num_physical_experts_per_node
def inverse(perm: np.ndarray) -> np.ndarray:
inv = np.empty_like(perm)
inv[np.arange(perm.shape[0])[:, None], perm] = np.arange(perm.shape[1], dtype=np.int32).reshape(1, -1)
return inv
# Step 1: generate redundant experts by weight.
# shape of tmp2log, tmprank is [num_layers, num_physical_experts]
# shape of logcnt is [num_layers, num_logical_experts]
tmp2log, tmprank, logcnt = replicate_experts(weight, num_physical_experts)
# Step 2: compute num_tokens of physical experts
# shape of tokens_per_tmp is [num_layers * num_nodes, num_physical_experts_per_node]
tokens_per_tmp = np.take_along_axis(weight / logcnt, tmp2log, axis=-1).reshape(-1, num_physical_experts_per_node)
# STEP 3: take load balance of gpu cards in node
# shape of gpu_index, rank_in_gpu, tmp2phy, phy2tmp is [num_layers * num_nodes, num_physical_experts_per_node]
gpu_index, rank_in_gpu = balanced_packing(tokens_per_tmp, num_gpus_per_node)
tmp2phy = gpu_index * num_physical_experts_per_gpu + rank_in_gpu
phy2tmp = inverse(tmp2phy)
# STEP 4: generate final phy2log mapping
tmp2log = tmp2log.reshape(-1, num_physical_experts_per_node)
tmprank = tmprank.reshape(-1, num_physical_experts_per_node)
phy2log = np.take_along_axis(tmp2log, phy2tmp, axis=-1).reshape(-1, num_physical_experts)
phyrank = np.take_along_axis(tmprank, phy2tmp, axis=-1).reshape(-1, num_physical_experts)
return phy2log, phyrank, logcnt
def rebalance_experts_hierarchical(
weight: np.ndarray,
num_physical_experts: int,
num_groups: int,
num_nodes: int,
num_gpus: int,
):
"""
Parameters:
weight: [num_moe_layers, num_logical_experts]
num_physical_experts: number of physical experts after replication
num_groups: number of expert groups
num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster
num_gpus: number of GPUs, must be a multiple of `num_nodes`
Returns:
physical_to_logical_map: [num_moe_layers, num_physical_experts]
logical_to_physical_map: [num_moe_layers, num_logical_experts, X]
logical_count: [num_moe_layers, num_logical_experts]
"""
num_layers, num_logical_experts = weight.shape
assert num_logical_experts % num_groups == 0
group_size = num_logical_experts // num_groups
assert num_groups % num_nodes == 0
groups_per_node = num_groups // num_nodes
assert num_gpus % num_nodes == 0
assert num_physical_experts % num_gpus == 0
phy_experts_per_gpu = num_physical_experts // num_gpus
def inverse(perm: np.ndarray) -> np.ndarray:
inv = np.empty_like(perm)
inv[np.arange(perm.shape[0])[:, None], perm] = np.arange(perm.shape[1], dtype=np.int32).reshape(1, -1)
return inv
# Step 1: pack groups to nodes
tokens_per_group = weight.reshape(num_layers, num_groups, group_size).sum(axis=-1)
group_pack_index, group_rank_in_pack = balanced_packing(tokens_per_group, num_nodes)
log2mlog = (
((group_pack_index * groups_per_node + group_rank_in_pack) * group_size)[:, :, None]
+ np.arange(group_size, dtype=np.int32)
).reshape(num_layers, -1)
mlog2log = inverse(log2mlog)
# Step 2: construct redundant experts within nodes
tokens_per_mlog = np.take_along_axis(weight, mlog2log, axis=-1).reshape(-1, num_logical_experts // num_nodes)
phy2mlog, phyrank, mlogcnt = replicate_experts(tokens_per_mlog, num_physical_experts // num_nodes)
# Step 3: pack physical_experts to GPUs
tokens_per_phy = np.take_along_axis(tokens_per_mlog / mlogcnt, phy2mlog, axis=-1)
pack_index, rank_in_pack = balanced_packing(tokens_per_phy, num_gpus // num_nodes)
phy2pphy = pack_index * phy_experts_per_gpu + rank_in_pack
pphy2phy = inverse(phy2pphy)
pphy2mlog = np.take_along_axis(phy2mlog, pphy2phy, axis=-1) # [num_layers * num_nodes, num_log_per_nodes]
pphy2mlog = (
pphy2mlog.reshape(num_layers, num_nodes, -1)
+ np.arange(0, num_logical_experts, num_logical_experts // num_nodes, dtype=np.int32).reshape(1, -1, 1)
).reshape(num_layers, -1)
pphy2log = np.take_along_axis(mlog2log, pphy2mlog, axis=-1)
pphyrank = np.take_along_axis(phyrank, pphy2phy, axis=-1).reshape(num_layers, -1)
logcnt = np.take_along_axis(mlogcnt.reshape(num_layers, -1), log2mlog, axis=-1)
return pphy2log, pphyrank, logcnt
def rebalance_experts(
weight: np.ndarray,
num_replicas: int,
num_groups: int,
num_nodes: int,
num_gpus: int,
eplb_strategy: str = "",
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Entry point for expert-parallelism load balancer.
Parameters:
weight: [layers, num_logical_experts], the load statistics for all logical experts
num_replicas: number of physical experts, must be a multiple of `num_gpus`
num_groups: number of expert groups
num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster
num_gpus: number of GPUs, must be a multiple of `num_nodes`
Returns:
physical_to_logical_map: [layers, num_replicas], the expert index of each replica
logical_to_physical_map: [layers, num_logical_experts, X], the replica indices for each expert
expert_count: [layers, num_logical_experts], number of physical replicas for each logical expert
"""
num_layers, num_logical_experts = weight.shape
weight = weight.astype(np.float32)
if eplb_strategy == "balance_intra_node":
phy2log, phyrank, logcnt = rebalance_experts_intra_node(weight, num_replicas, num_groups, num_nodes, num_gpus)
else:
if num_groups % num_nodes == 0:
# use hierarchical load-balance policy
phy2log, phyrank, logcnt = rebalance_experts_hierarchical(
weight, num_replicas, num_groups, num_nodes, num_gpus
)
else:
# use global load-balance policy
phy2log, phyrank, logcnt = replicate_experts(weight, num_replicas)
maxlogcnt = logcnt.max()
log2phy = np.full((num_layers, num_logical_experts, maxlogcnt), -1, dtype=np.int32)
np.put_along_axis(
log2phy.reshape(num_layers, -1)[:, :, None],
(phy2log * maxlogcnt + phyrank)[:, :, None],
np.arange(num_replicas, dtype=np.int32).reshape(1, -1).repeat(num_layers, axis=0)[:, :, None],
axis=1,
)
return phy2log, log2phy, logcnt
__all__ = ["rebalance_experts"]
def main():
"""
main
"""
num_hidden_layers = 3
num_expert = 64
num_groups = 8
num_replicas = 64
num_nodes = 4
num_gpus = 4 * 8
# model_tokens_per_expert_stats_list = np.ones(
# (num_hidden_layers, num_expert), dtype=int)
model_tokens_per_expert_stats_list = np.random.randint(low=1, high=10, size=(num_hidden_layers, num_expert))
phy2log, phyrank, logcnt = rebalance_experts(
model_tokens_per_expert_stats_list,
num_replicas,
num_groups,
num_nodes,
num_gpus,
)
print(phy2log)
print(phyrank)
print(logcnt)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,602 @@
"""
redundant expert manger
"""
import threading
import time
from http import HTTPStatus
from multiprocessing import Pipe, Process, shared_memory
import numpy as np
import requests
from fastdeploy.eplb.async_expert_loader import load_model_weights_process
from fastdeploy.eplb.eplb import rebalance_experts
from fastdeploy.eplb.utils import RearrangeExpertState, RedundantExpertWorkload
from fastdeploy.utils import envs, get_logger
class RedundantExpertManager:
"""
RedundantExpertManger
"""
def __init__(self, rank=0, ep_size=32, fd_config=None):
self.logger = get_logger("eplb_expert_manager", "eplb_{0}.log".format(rank))
self.rank = rank
self.ep_size = ep_size
self.fd_config = fd_config
self.eplb_config = fd_config.eplb_config
self.api_user = self.eplb_config.redundant_expert_api_user
self.api_passwd = self.eplb_config.redundant_expert_api_password
self.num_hidden_layers = self.eplb_config.model_config.num_layers
self.num_logical_experts = self.eplb_config.model_config.moe_num_experts
self.num_redundant_experts = self.eplb_config.redundant_experts_num
self.num_replicas = self.num_logical_experts + self.num_redundant_experts
self.num_groups = self.num_logical_experts
self.num_nodes = max(ep_size // 8, 1)
self.num_gpus = ep_size
self.expert_per_rank = self.num_replicas // ep_size
assert (
self.num_replicas % ep_size == 0
), f"num_replicas must be divisible by ep_size, \
but got num_replicas = {self.num_replicas}, ep_size = {ep_size}"
self.model_ep_rank_to_expert_id_list = np.full(
(
self.num_hidden_layers,
self.num_logical_experts + self.num_redundant_experts,
),
-1,
dtype=np.int32,
)
self.model_expert_id_to_ep_rank_array = np.full(
(
self.num_hidden_layers,
self.num_logical_experts,
self.num_redundant_experts + 1,
),
-1,
dtype=np.int32,
)
self.model_expert_in_rank_num_list = np.zeros(
(self.num_hidden_layers, self.num_logical_experts), dtype=np.int32
)
# backup info
self.last_model_ep_rank_to_expert_id_list = np.full(
(
self.num_hidden_layers,
self.num_logical_experts + self.num_redundant_experts,
),
-1,
dtype=np.int32,
)
self.last_model_expert_id_to_ep_rank_array = np.full(
(
self.num_hidden_layers,
self.num_logical_experts,
self.num_redundant_experts + 1,
),
-1,
dtype=np.int32,
)
self.last_model_expert_in_rank_num_list = np.zeros(
(self.num_hidden_layers, self.num_logical_experts), dtype=np.int32
)
self.model_tokens_per_expert_stats_list = np.ones(
(self.num_hidden_layers, self.num_logical_experts), dtype=np.int32
)
self.caculate_expert_rank_table(True)
self.dp_rank_address = None
self.need_allgather_load_weight_result = False
self.load_weight_begin_ts = 0
self.load_weight_timeout = 300 # 5min
self.need_rearrange_expert = False
self.need_update_expert_tokens_stat = True
self.http_timeout = 1
# 重置重排状态: 'done' -> 'free'
self.rearrange_end_ts = 0
self.rearrange_reset_interval = 300
self.tensor_infos = None
self.parent_data_conn, child_data_conn = Pipe()
self.parent_mg_conn, child_mg_conn = Pipe()
Process(
target=load_model_weights_process,
name=f"eplb::async_load_model_{rank}",
args=(
self.rank,
self.expert_per_rank,
self.fd_config.model_config.moe_layer_start_index,
self.eplb_config.moe_quant_type,
child_data_conn,
child_mg_conn,
),
).start()
child_data_conn.close()
child_mg_conn.close()
listen_signal_thread = threading.Thread(target=self.listen_rearrange_expert_signal, args=(), daemon=True)
listen_signal_thread.start()
self.logger.info(
f"redundant_expert: RedundantExpertManager init success, rank {rank}, \
strategy {self.eplb_config.redundant_expert_eplb_strategy}"
)
def get_unique_name(self, name):
return f"{envs.get_unique_name(name + '_dprank_' + str(self.rank))}"
def get_ep_rank_to_expert_id_list(self):
"""
get_ep_rank_to_expert_id_list
"""
return (
self.model_ep_rank_to_expert_id_list,
self.model_expert_id_to_ep_rank_array,
self.model_expert_in_rank_num_list,
)
def listen_rearrange_expert_signal(self):
"""
listen_rearrange_expert_signal
"""
if self.rank == 0:
rearrange_experts_ips_size = np.zeros([1], dtype=np.int32)
shm_rearrange_experts_ips_size = shared_memory.SharedMemory(
create=False,
size=rearrange_experts_ips_size.nbytes,
name=self.get_unique_name("rearrange_experts_ips_size"),
)
rearrange_experts_ips_size_array = np.ndarray(
rearrange_experts_ips_size.shape,
dtype=rearrange_experts_ips_size.dtype,
buffer=shm_rearrange_experts_ips_size.buf,
)
shm_rearrange_experts_ips_list = shared_memory.SharedMemory(
create=False,
size=1024,
name=self.get_unique_name("rearrange_experts_ips_list"),
)
rearrange_experts_status = np.zeros([1], dtype=np.int32)
shm_rearrange_experts_status = shared_memory.SharedMemory(
create=False,
size=rearrange_experts_status.nbytes,
name=self.get_unique_name("rearrange_experts_status"),
)
rearrange_experts_status_array = np.ndarray(
rearrange_experts_status.shape,
dtype=rearrange_experts_status.dtype,
buffer=shm_rearrange_experts_status.buf,
)
signal_update_weight_from_disk = np.zeros([1], dtype=np.int32)
shm_signal_update_weight_from_disk = shared_memory.SharedMemory(
create=False,
size=signal_update_weight_from_disk.nbytes,
name=self.get_unique_name("signal_update_weight_from_disk"),
)
signal_update_weight_from_disk_array = np.ndarray(
signal_update_weight_from_disk.shape,
dtype=signal_update_weight_from_disk.dtype,
buffer=shm_signal_update_weight_from_disk.buf,
)
experts_token_stats = np.zeros((self.num_hidden_layers, 64), dtype=np.int32)
shm_all_experts_token_stats = shared_memory.SharedMemory(
create=False,
size=experts_token_stats.nbytes,
name=self.get_unique_name("all_experts_token_stats"),
)
while True:
if self.rank == 0:
now = int(time.time())
if rearrange_experts_ips_size_array[0] > 0:
# step 1. all reduce experts token stats
address = bytes(shm_rearrange_experts_ips_list.buf[: rearrange_experts_ips_size_array[0]]).decode(
"utf-8"
)
self.logger.info(f"redundant_expert: all rank ips {address}")
rearrange_experts_ips_size_array[0] = 0
rearrange_experts_status_array[0] = RearrangeExpertState.doing.value
self.dp_rank_address = address.strip().split(";")
if self.allreduce_experts_stat():
self.need_allgather_load_weight_result = True
self.load_weight_begin_ts = now
self.logger.info("redundant_expert: all-reduce experts stats success")
else:
rearrange_experts_status_array[0] = RearrangeExpertState.free.value
self.logger.warning("redundant_expert: all-reduce experts stats fail")
elif self.need_allgather_load_weight_result and self.allreduce_load_weight_result():
# step 3. all reduce the result of load weight from disk
self.need_allgather_load_weight_result = False
rearrange_experts_status_array[0] = RearrangeExpertState.load_succ.value
self.rearrange_end_ts = now
if rearrange_experts_status_array[0] > 1 and (
now - self.rearrange_end_ts > self.rearrange_reset_interval
):
# reset rearrange status
rearrange_experts_status_array[0] = RearrangeExpertState.free.value
if signal_update_weight_from_disk_array[0] == 1:
# step 2. async load weight: disk -> memory
expert_token_stats = np.ndarray(
experts_token_stats.shape,
dtype=experts_token_stats.dtype,
buffer=shm_all_experts_token_stats.buf,
)
self.model_tokens_per_expert_stats_list[:] = expert_token_stats[:]
self.caculate_expert_rank_table()
self.update_weight_from_disk()
signal_update_weight_from_disk_array[0] = 0
time.sleep(0.5)
def caculate_expert_rank_table(self, is_init=False):
"""
caculate_expert_rank_table
"""
num_groups = self.num_groups
num_nodes = self.num_nodes
num_gpus = self.num_gpus
eplb_strategy = self.eplb_config.redundant_expert_eplb_strategy
if is_init:
num_groups = 1
num_nodes = 2
num_gpus = 2 * 8
eplb_strategy = ""
# eplb
rank_expert_list, logical_to_physical_map, expert_count = rebalance_experts(
self.model_tokens_per_expert_stats_list,
self.num_replicas,
num_groups,
num_nodes,
num_gpus,
eplb_strategy,
)
# backup info
self.last_model_ep_rank_to_expert_id_list[:] = self.model_ep_rank_to_expert_id_list[:]
self.last_model_expert_id_to_ep_rank_array[:] = self.model_expert_id_to_ep_rank_array[:]
self.last_model_expert_in_rank_num_list[:] = self.model_expert_in_rank_num_list[:]
# update model info
self.model_ep_rank_to_expert_id_list[:] = rank_expert_list[:]
self.model_expert_id_to_ep_rank_array.fill(-1)
self.model_expert_id_to_ep_rank_array[..., : logical_to_physical_map.shape[-1]] = logical_to_physical_map[:]
self.model_expert_in_rank_num_list[:] = expert_count[:]
if self.rank == 0:
workload = RedundantExpertWorkload()
workload.tokens_per_expert_stats_list = self.model_tokens_per_expert_stats_list.tolist()
workload.ep_rank_to_expert_id_list = rank_expert_list.tolist()
workload.expert_id_to_ep_rank_array = logical_to_physical_map.tolist()
workload.expert_in_rank_num_list = expert_count.tolist()
self.logger.info(workload.dump())
def update_weight_from_disk(self):
"""
update_weight_from_disk
"""
begin_time = time.time()
result_update_weight_from_disk = np.zeros([1], dtype=np.int32)
shm_result_update_weight_from_disk = shared_memory.SharedMemory(
create=False,
size=result_update_weight_from_disk.nbytes,
name=self.get_unique_name("result_update_weight_from_disk"),
)
result_update_weight_from_disk_array = np.ndarray(
result_update_weight_from_disk.shape,
dtype=result_update_weight_from_disk.dtype,
buffer=shm_result_update_weight_from_disk.buf,
)
result_update_weight_from_disk_array[0] = 0
self.logger.info(f"redundant_expert: update_weight_from_disk send to async process, rank {self.rank}")
self.parent_mg_conn.send(
{
"old_model_ep_rank_to_expert_id_list": self.last_model_ep_rank_to_expert_id_list,
"new_model_ep_rank_to_expert_id_list": self.model_ep_rank_to_expert_id_list,
}
)
self.logger.info(f"redundant_expert: update_weight_from_disk recv from async process, rank {self.rank}")
response = self.parent_data_conn.recv()
self.tensor_infos = response["weights"]
# 更新权重加载结果
result_update_weight_from_disk_array[0] = 1 if response["result"] else -1
self.logger.info(
"redundant_expert: update_weight_from_disk end, rank"
+ f" {self.rank} {response['result']}, cost {int(time.time() - begin_time)}s"
)
def allreduce_experts_stat(self):
"""
专家负载
"""
if not self.allgather_expert_token_stats():
return False
return self.broadcast_expert_token_stats()
def allgather_expert_token_stats(self):
"""
allgather_expert_token_stats
"""
success_count = 0
expert_token_stats = np.zeros((self.num_hidden_layers, self.num_logical_experts), dtype=np.int32)
for addr in self.dp_rank_address:
try:
# TODO: 请求失败重试
params = {"user": self.api_user, "passwd": self.api_passwd}
res = requests.post(
f"http://{addr}/get_per_expert_tokens_stats",
json=params,
timeout=self.http_timeout,
)
if res.status_code != HTTPStatus.OK:
self.logger.warning(
"redundant_expert: allgather_expert_token_stats fail. "
+ f"addr {addr}, res {res.status_code} {res.json()}"
)
break
success_count += 1
expert_token_stats += np.array(res.json()["data"], dtype=np.int32)
except Exception as e:
self.logger.error(f"redundant_expert: allgather_expert_token_stats fail. addr {addr}, error {e}")
if success_count == len(self.dp_rank_address):
self.need_rearrange_expert = True
self.model_tokens_per_expert_stats_list[:] = expert_token_stats[:]
self.logger.info("redundant_expert: allgather_expert_token_stats success")
return True
self.logger.info(
"redundant_expert: allgather_expert_token_stats fail. "
+ f"succ {success_count} total {len(self.dp_rank_address)}"
)
return False
def broadcast_expert_token_stats(self):
"""
broadcast_expert_token_stats
"""
success_count = 0
for addr in self.dp_rank_address:
try:
params = {
"user": self.api_user,
"passwd": self.api_passwd,
"action": "recv_expert_weight",
"data": self.model_tokens_per_expert_stats_list.tolist(),
}
res = requests.post(
f"http://{addr}/rearrange_experts",
json=params,
timeout=self.http_timeout,
)
if res.status_code != HTTPStatus.OK:
self.logger.warning(
"redundant_expert: broadcast_expert_token_stats fail. "
+ f"addr {addr}, res {res.status_code} {res.json()}"
)
break
success_count += 1
except Exception as e:
self.logger.error(
f"redundant_expert: broadcast_expert_token_stats request fail. addr {addr}, error {e}"
)
if success_count == len(self.dp_rank_address):
self.logger.info("redundant_expert: broadcast_expert_token_stats success")
return True
self.logger.info(
"redundant_expert: broadcast_expert_token_stats failed, "
+ f"succ {success_count} total {len(self.dp_rank_address)}"
)
return False
def allreduce_load_weight_result(self):
"""
权重加载结果
"""
if int(time.time()) - self.load_weight_begin_ts > self.load_weight_timeout:
self.logger.info(f"redundant_expert: allreduce_load_weight_result timeout {self.load_weight_timeout}s")
return True
all_success, exist_fail = self.allgather_load_weight_result()
if exist_fail:
# 如果有DP权重加载异常结束本次重排
self.logger.warning("redundant_expert: allreduce_load_weight_result exist fail, terminate this rearrange")
return True
if not all_success:
self.logger.info("redundant_expert: allreduce_load_weight_result waiting")
return False
# self.broadcast_load_weight_success()
if not exist_fail and all_success:
# prefill需要等待调度屏蔽
if (
self.fd_config.splitwise_role == "decode"
or not self.eplb_config.redundant_expert_enable_schedule_cordon
):
self.logger.info("redundant_expert: allreduce_load_weight_result success, notify infer.py")
signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32)
shm_signal_update_weight_from_tensor = shared_memory.SharedMemory(
create=False,
size=signal_update_weight_from_tensor.nbytes,
name=self.get_unique_name("signal_update_weight_from_tensor"),
)
signal_update_weight_from_tensor_array = np.ndarray(
signal_update_weight_from_tensor.shape,
dtype=signal_update_weight_from_tensor.dtype,
buffer=shm_signal_update_weight_from_tensor.buf,
)
signal_update_weight_from_tensor_array[0] = 1
return True
def allgather_load_weight_result(self):
"""
allgather_load_weight_result
"""
all_success, exist_fail = False, False
success_count, fail_count = 0, 0
for addr in self.dp_rank_address:
try:
params = {
"user": self.api_user,
"passwd": self.api_passwd,
"action": "check_load_weight_result",
}
res = requests.post(
f"http://{addr}/check_redundant",
json=params,
timeout=self.http_timeout,
)
if res.status_code != HTTPStatus.OK:
self.logger.warning(
"redundant_expert: allgather_load_weight_result fail. "
+ f"addr {addr}, res {res.status_code} {res.json()}"
)
break
result = res.json()["data"]
self.logger.info(
f"redundant_expert: allgather_load_weight_result success. addr {addr}, result {result}"
)
if result == 1:
success_count += 1
elif result == -1:
fail_count += 1
self.logger.error(
f"redundant_expert: allgather_load_weight_result fail. addr {addr}, result {result}"
)
exist_fail = True
except Exception as e:
self.logger.error(f"redundant_expert: allgather_load_weight_result error. addr {addr}, error {e}")
if success_count == len(self.dp_rank_address):
self.logger.info("redundant_expert: allgather_load_weight_result all success")
all_success = True
else:
self.logger.info(
"redundant_expert: allgather_load_weight_result not all ready, "
+ f"succ {success_count} fail {fail_count} total {len(self.dp_rank_address)}"
)
return all_success, exist_fail
def init_shared_memory_for_eplb_rank0(rank):
rearrange_experts_ips_size = np.zeros([1], dtype=np.int32)
shm_rearrange_experts_ips_size = shared_memory.SharedMemory(
create=True,
size=rearrange_experts_ips_size.nbytes,
name=f"{envs.get_unique_name('rearrange_experts_ips_size_dprank' + rank)}",
)
rearrange_experts_ips_size_array = np.ndarray(
rearrange_experts_ips_size.shape,
dtype=rearrange_experts_ips_size.dtype,
buffer=shm_rearrange_experts_ips_size.buf,
)
shm_rearrange_experts_ips_list = shared_memory.SharedMemory(
create=True,
size=envs.FD_REDUNDANT_EXPERT_IP_SHM_SIZE,
name=f"{envs.get_unique_name('rearrange_experts_ips_list_dprank' + rank)}",
)
# 记录专家重排状态
rearrange_experts_status = np.zeros([1], dtype=np.int32)
shm_rearrange_experts_status = shared_memory.SharedMemory(
create=True,
size=rearrange_experts_status.nbytes,
name=f"{envs.get_unique_name('rearrange_experts_status_dprank' + rank)}",
)
rearrange_experts_status_array = np.ndarray(
rearrange_experts_status.shape, dtype=rearrange_experts_status.dtype, buffer=shm_rearrange_experts_status.buf
)
# 接收更新权重的信号
signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32)
shm_signal_update_weight_from_tensor = shared_memory.SharedMemory(
create=True,
size=signal_update_weight_from_tensor.nbytes,
name=f"{envs.get_unique_name('signal_update_weight_from_tensor_dprank' + rank) }",
)
signal_update_weight_from_tensor_array = np.ndarray(
signal_update_weight_from_tensor.shape,
dtype=signal_update_weight_from_tensor.dtype,
buffer=shm_signal_update_weight_from_tensor.buf,
)
return (
rearrange_experts_ips_size_array,
shm_rearrange_experts_ips_list,
rearrange_experts_status_array,
signal_update_weight_from_tensor_array,
)
def init_shared_memory_for_eplb_each_rank(fd_config, rank):
# 记录专家负载
num_layers = fd_config.model_config.num_hidden_layers
num_experts = fd_config.model_config.moe_num_experts
experts_token_stats = np.zeros((num_layers, num_experts), dtype=np.int32)
shm_local_experts_token_stats = shared_memory.SharedMemory(
create=True,
size=experts_token_stats.nbytes,
name=f"{envs.get_unique_name('local_experts_token_stats_dprank' + rank)}",
)
local_experts_token_stats_array = np.ndarray(
experts_token_stats.shape, dtype=experts_token_stats.dtype, buffer=shm_local_experts_token_stats.buf
)
# TODO: 全局专家负载状态是一样的节点上的所有DP可以共用一份但需要避免多个DP同时更新
shm_all_experts_token_stats = shared_memory.SharedMemory(
create=True,
size=experts_token_stats.nbytes,
name=f"{envs.get_unique_name('all_experts_token_stats_dprank' + rank)}",
)
expert_tokens_stats_array = np.ndarray(
experts_token_stats.shape, dtype=experts_token_stats.dtype, buffer=shm_all_experts_token_stats.buf
)
# 接收加载权重的信号
signal_update_weight_from_disk = np.zeros([1], dtype=np.int32)
shm_signal_update_weight_from_disk = shared_memory.SharedMemory(
create=True,
size=signal_update_weight_from_disk.nbytes,
name=f"{envs.get_unique_name('signal_update_weight_from_disk_dprank' + rank)}",
)
signal_update_weight_from_disk_array = np.ndarray(
signal_update_weight_from_disk.shape,
dtype=signal_update_weight_from_disk.dtype,
buffer=shm_signal_update_weight_from_disk.buf,
)
# 记录加载权重的结果
result_update_weight_from_disk = np.zeros([1], dtype=np.int32)
shm_result_update_weight_from_disk = shared_memory.SharedMemory(
create=True,
size=result_update_weight_from_disk.nbytes,
name=f"{envs.get_unique_name('result_update_weight_from_disk_dprank' + rank)}",
)
result_update_weight_from_disk_array = np.ndarray(
result_update_weight_from_disk.shape,
dtype=result_update_weight_from_disk.dtype,
buffer=shm_result_update_weight_from_disk.buf,
)
# 接收清零专家负载的信号
signal_clear_experts_token_stats = np.zeros([1], dtype=np.int32)
shm_signal_clear_experts_token_stats = shared_memory.SharedMemory(
create=True,
size=signal_clear_experts_token_stats.nbytes,
name=f"{envs.get_unique_name('signal_clear_experts_token_stats_dprank' + rank)}",
)
signal_clear_experts_token_stats_array = np.ndarray(
signal_clear_experts_token_stats.shape,
dtype=signal_clear_experts_token_stats.dtype,
buffer=shm_signal_clear_experts_token_stats.buf,
)
return (
local_experts_token_stats_array,
expert_tokens_stats_array,
signal_update_weight_from_disk_array,
result_update_weight_from_disk_array,
signal_clear_experts_token_stats_array,
)

60
fastdeploy/eplb/utils.py Normal file
View File

@@ -0,0 +1,60 @@
"""eplb utilities"""
import json
import os
import time
from enum import Enum
class RedundantExpertWorkload:
"""Redundant Expert Workload"""
def __init__(self, redundant_expert_meta_dir="/tmp/redundant_expert_meta"):
self.update_timestamp = time.time()
self.tokens_per_expert_stats_list = None
self.ep_rank_to_expert_id_list = None
self.expert_id_to_ep_rank_array = None
self.expert_in_rank_num_list = None
self.cost_milliseconds = 0
self.meta_file_name = f"{redundant_expert_meta_dir}/rearrange-experts.json"
if not os.path.exists(redundant_expert_meta_dir):
os.makedirs(redundant_expert_meta_dir, exist_ok=True)
def __json__(self):
return self.__dict__
def dump(self):
"""Dump the object to a JSON file."""
begin = time.time()
try:
with open(self.meta_file_name, "w") as fout:
json.dump(self.__dict__, fout)
except Exception as e:
return f"redundant_expert: dump expert workload failed, {e}"
cost_time = int((time.time() - begin) * 1000 * 1000)
return f"redundant_expert: dump expert workload result in {cost_time} us"
def load(self):
"""Load the object from a JSON file."""
if not os.path.exists(self.meta_file_name):
return {}, f"redundant_expert: file {self.meta_file_name} is not exists"
try:
with open(self.meta_file_name, "r") as fin:
meta = json.load(fin)
self.__dict__.update(meta)
return self.__json__(), "ok"
except Exception as e:
return {}, f"redundant_expert: load file {self.meta_file_name} failed, {e}"
class RearrangeExpertState(Enum):
"""RearrangeExpertState"""
free = 0
doing = 1
load_succ = 2 # load weight from disk success
done = 3
if __name__ == "__main__":
print(RedundantExpertWorkload("/tmp").load())

View File

@@ -18,6 +18,7 @@ import argparse
import json
import os
import time
from multiprocessing import shared_memory
from typing import Tuple
import numpy as np
@@ -30,6 +31,7 @@ from fastdeploy.config import (
CacheConfig,
DeviceConfig,
EarlyStopConfig,
EPLBConfig,
ErnieArchitectures,
FDConfig,
GraphOptimizationConfig,
@@ -40,6 +42,14 @@ from fastdeploy.config import (
SpeculativeConfig,
StructuredOutputsConfig,
)
from fastdeploy.eplb.async_expert_loader import (
MODEL_MAIN_NAME,
REARRANGE_EXPERT_MAGIC_NUM,
create_mmap,
load_tensor_from_shm_mem,
)
from fastdeploy.eplb.experts_manager import RedundantExpertManager
from fastdeploy.eplb.utils import RearrangeExpertState
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
from fastdeploy.inter_communicator import (
ExistTaskStatus,
@@ -147,6 +157,7 @@ class PaddleDisWorkerProc:
self.parallel_config = fd_config.parallel_config
self.cache_config = fd_config.cache_config
self.scheduler_config = fd_config.scheduler_config
self.eplb_config = fd_config.eplb_config
# TODO(gongshaotian): Use worker factory to get worker
self.worker = get_worker(fd_config=fd_config, local_rank=self.local_rank, rank=self.ranks)
@@ -245,6 +256,18 @@ class PaddleDisWorkerProc:
create=False,
)
def update_weights_from_tensor(self, mmap_infos):
"""
update_weights_from_tensor
"""
state_dicts = load_tensor_from_shm_mem(self.experts_manager.tensor_infos, mmap_infos[MODEL_MAIN_NAME], logger)
rank_expert_list, logical_to_physical_map, expert_count = self.experts_manager.get_ep_rank_to_expert_id_list()
self.worker.get_model().redundant_table_manger.update_expert_rank_table(
rank_expert_list, logical_to_physical_map, expert_count
)
# TO BE FIXED
self.worker.get_model().update_state_dict(state_dicts)
def _broadcast_model_weights_signal(self, src: int, group) -> int:
model_weights_signal_tensor = paddle.full(shape=[1], fill_value=self.model_weights_signal[0], dtype="int32")
paddle.distributed.broadcast(model_weights_signal_tensor, src=src, group=group)
@@ -260,7 +283,63 @@ class PaddleDisWorkerProc:
"""Main event loop for Paddle Distributed Workers.
TODO(gongshaotian): support remote calling of functions that control worker.
"""
if self.eplb_config.enable_redundant_experts:
self.last_dump_expert_workload_ts = 0
self.experts_manager = RedundantExpertManager(
rank=self.local_rank, ep_size=self.ranks, fd_config=self.fd_config
)
num_layers = self.fd_config.model_config.num_hidden_layers
num_experts = self.fd_config.model_config.moe_num_experts
expert_token_stats = np.zeros((num_layers, num_experts), dtype=np.int32)
shm_local_experts_token_stats = shared_memory.SharedMemory(
create=False,
size=expert_token_stats.nbytes,
name=f"{envs.get_unique_name('local_experts_token_stats_dprank' + self.local_rank)}",
)
expert_tokens_stats_array = np.ndarray(
expert_token_stats.shape, dtype=expert_token_stats.dtype, buffer=shm_local_experts_token_stats.buf
)
signal_clear_experts_token_stats = np.zeros([1], dtype=np.int32)
shm_signal_clear_experts_token_stats = shared_memory.SharedMemory(
create=False,
size=signal_clear_experts_token_stats.nbytes,
name=f"{envs.get_unique_name('signal_clear_experts_token_stats_dprank' + self.local_rank)}",
)
signal_clear_experts_token_stats_array = np.ndarray(
signal_clear_experts_token_stats.shape,
dtype=signal_clear_experts_token_stats.dtype,
buffer=shm_signal_clear_experts_token_stats.buf,
)
if self.local_rank == 0:
signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32)
shm_signal_update_weight_from_tensor = shared_memory.SharedMemory(
create=False,
size=signal_update_weight_from_tensor.nbytes,
name=f"{envs.get_unique_name('signal_update_weight_from_tensor_dprank' + self.local_rank)}",
)
signal_update_weight_from_tensor_array = np.ndarray(
signal_update_weight_from_tensor.shape,
dtype=signal_update_weight_from_tensor.dtype,
buffer=shm_signal_update_weight_from_tensor.buf,
)
rearrange_experts_status = np.zeros([1], dtype=np.int32)
shm_rearrange_experts_status = shared_memory.SharedMemory(
create=False,
size=rearrange_experts_status.nbytes,
name=f"{envs.get_unique_name('rearrange_experts_status_dprank' + self.local_rank)}",
)
rearrange_experts_status_array = np.ndarray(
rearrange_experts_status.shape,
dtype=rearrange_experts_status.dtype,
buffer=shm_rearrange_experts_status.buf,
)
expert_workload_dump_interval = envs.FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL
mmap_infos = create_mmap(
[MODEL_MAIN_NAME], self.local_rank, self.ranks, shm_uuid=os.getenv("SHM_UUID", ""), logger=logger
)
# Currently, only support single node
self.nnode = int((self.parallel_config.tensor_parallel_size + 7) // 8)
req_ids = []
@@ -268,6 +347,44 @@ class PaddleDisWorkerProc:
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
self.model_weights_signal = np.zeros([1], dtype=np.int32)
while True:
if self.eplb_config.enable_redundant_experts:
rearrange_time = time.time()
# 获取专家负载
if expert_tokens_stats_array is not None and (
int(rearrange_time) - self.last_dump_expert_workload_ts > expert_workload_dump_interval
):
self.last_dump_expert_workload_ts = int(rearrange_time)
clear_stat = False
if signal_clear_experts_token_stats_array[0] == 1:
clear_stat = True
signal_clear_experts_token_stats_array[0] = 0
(
new_stats_array,
_,
_,
_,
) = self.worker.get_model().redundant_table_manger.get_expert_tokens_stats(clear_stat=clear_stat)
expert_tokens_stats_array[:] = new_stats_array[:]
elif expert_tokens_stats_array is None:
logger.warning("redundant_expert: expert_tokens_stats_array not init")
# 所有DP同步更新权重
broadcast_value = 0
if self.local_rank == 0 and signal_update_weight_from_tensor_array[0] == 1:
logger.info("redundant_expert: update_weight_from_tensor broadcast signal")
signal_update_weight_from_tensor_array[0] = 0
broadcast_value = REARRANGE_EXPERT_MAGIC_NUM
data = paddle.to_tensor([broadcast_value])
paddle.distributed.broadcast(data, 0)
if data[0] == REARRANGE_EXPERT_MAGIC_NUM:
self.update_weights_from_tensor(mmap_infos)
logger.info(
f"redundant_expert: update_weight_from_tensor success, cost {(time.time() - rearrange_time)*1000}ms"
)
paddle.distributed.barrier()
if self.local_rank == 0:
rearrange_experts_status_array[0] = RearrangeExpertState.done.value
logger.info("redundant_expert: done")
if self.local_rank % self.parallel_config.tensor_parallel_size == 0:
if self.model_weights_status.value[0] != ModelWeightsStatus.NORMAL:
self.model_weights_signal[0] = int(self.model_weights_status.value[0])
@@ -754,6 +871,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
plas_attention_config = PlasAttentionConfig(args.plas_attention_config)
early_stop_config = EarlyStopConfig(args.early_stop_config)
eplb_config = EPLBConfig()
structured_outputs_config: StructuredOutputsConfig = StructuredOutputsConfig(args=vars(args))
@@ -819,6 +937,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
ips=args.ips,
plas_attention_config=plas_attention_config,
structured_outputs_config=structured_outputs_config,
eplb_config=eplb_config,
)
update_fd_config_for_mm(fd_config)
if fd_config.load_config.load_choices == "default_v1" and not v1_loader_support(fd_config):