From f83d0cf127a24ee16396302c29f6364b90104336 Mon Sep 17 00:00:00 2001 From: chenjian <1435317881@qq.com> Date: Mon, 3 Nov 2025 14:08:15 +0800 Subject: [PATCH] [Feature] Support eplb for fd (#4599) * support eplb * support eplb --------- Co-authored-by: kevin --- fastdeploy/config.py | 25 + fastdeploy/engine/args_utils.py | 4 + fastdeploy/envs.py | 21 + fastdeploy/eplb/__init__.py | 3 + fastdeploy/eplb/async_expert_loader.py | 405 +++++++++++++++++ fastdeploy/eplb/eplb.py | 288 ++++++++++++ fastdeploy/eplb/experts_manager.py | 602 +++++++++++++++++++++++++ fastdeploy/eplb/utils.py | 60 +++ fastdeploy/worker/worker_process.py | 119 +++++ 9 files changed, 1527 insertions(+) create mode 100644 fastdeploy/eplb/__init__.py create mode 100644 fastdeploy/eplb/async_expert_loader.py create mode 100644 fastdeploy/eplb/eplb.py create mode 100644 fastdeploy/eplb/experts_manager.py create mode 100644 fastdeploy/eplb/utils.py diff --git a/fastdeploy/config.py b/fastdeploy/config.py index e49b8476b..287532922 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1118,6 +1118,29 @@ class PoolerConfig: """ +class EPLBConfig: + """ + Configuration for EPLB manager. + """ + + def __init__( + self, + ): + self.enable_redundant_experts = envs.FD_ENABLE_REDUNDANT_EXPERTS + self.redundant_experts_num = envs.FD_REDUNDANT_EXPERTS_NUM + self.redundant_expert_ip_shm_size = envs.FD_REDUNDANT_EXPERT_IP_SHM_SIZE + self.redundant_expert_meta_dir = envs.FD_REDUNDANT_EXPERT_META_DIR + self.redundant_expert_api_user = envs.FD_REDUNDANT_EXPERT_API_USER + self.redundant_expert_api_password = envs.FD_REDUNDANT_EXPERT_API_PASSWORD + self.redundant_expert_eplb_strategy = envs.FD_REDUNDANT_EXPERT_EPLB_STRATEGY + self.redundant_expert_dump_workload_interval = envs.FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL + self.redundant_expert_async_load_model_shmem_size_gb = envs.FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB + self.redundant_expert_enable_schedule_cordon = envs.FD_REDUNDANT_EXPERT_ENABLE_SCHEDULE_CORDON + self.model_use_safetensors = envs.FD_MODEL_USE_SAFETENSORS + self.model_use_offline_quant = envs.FD_MODEL_USE_OFFLINE_QUANT + self.moe_quant_type = envs.FD_MOE_QUANT_TYPE + + class CacheConfig: """ Configuration for the KV cache. @@ -1379,6 +1402,7 @@ class FDConfig: graph_opt_config: GraphOptimizationConfig = None, plas_attention_config: PlasAttentionConfig = None, speculative_config: SpeculativeConfig = None, + eplb_config: EPLBConfig = None, structured_outputs_config: StructuredOutputsConfig = None, tokenizer: str = None, ips: str = None, @@ -1398,6 +1422,7 @@ class FDConfig: self.scheduler_config: SchedulerConfig = scheduler_config # type: ignore self.parallel_config = parallel_config # type: ignore self.speculative_config: SpeculativeConfig = speculative_config + self.eplb_config: Optional[EPLBConfig] = eplb_config self.device_config: DeviceConfig = device_config # type: ignore self.load_config: LoadConfig = load_config self.quant_config: Optional[QuantConfigBase] = quant_config diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 30dbff6b7..2ce7482d0 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -26,6 +26,7 @@ from fastdeploy.config import ( CacheConfig, ConvertOption, EarlyStopConfig, + EPLBConfig, FDConfig, GraphOptimizationConfig, LoadConfig, @@ -1076,6 +1077,8 @@ class EngineArgs: Create and return a Config object based on the current settings. """ all_dict = asdict(self) + eplb_cfg = EPLBConfig() + all_dict["enable_redundant_experts"] = eplb_cfg.enable_redundant_experts model_cfg = ModelConfig(all_dict) # XPU currently disable prefix cache for VL model @@ -1134,6 +1137,7 @@ class EngineArgs: load_config=load_cfg, parallel_config=parallel_cfg, speculative_config=speculative_cfg, + eplb_config=eplb_cfg, structured_outputs_config=structured_outputs_config, ips=self.ips, use_warmup=self.use_warmup, diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 3d4c2feaa..258631f58 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -128,6 +128,27 @@ environment_variables: dict[str, Callable[[], Any]] = { "FD_CACHE_PROC_EXIT_TIMEOUT": lambda: int(os.getenv("FD_CACHE_PROC_EXIT_TIMEOUT", "600")), # Count for cache_transfer_manager process error "FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")), + # EPLB related + "FD_ENABLE_REDUNDANT_EXPERTS": lambda: int(os.getenv("FD_ENABLE_REDUNDANT_EXPERTS", "0")) == 1, + "FD_REDUNDANT_EXPERTS_NUM": lambda: int(os.getenv("FD_REDUNDANT_EXPERTS_NUM", "0")), + "FD_REDUNDANT_EXPERT_IP_SHM_SIZE": lambda: int(os.getenv("FD_REDUNDANT_EXPERT_IP_SHM_SIZE", "1024")), + "FD_REDUNDANT_EXPERT_META_DIR": lambda: os.getenv("FD_REDUNDANT_EXPERT_META_DIR", "/tmp/redundant_expert_meta"), + "FD_REDUNDANT_EXPERT_API_USER": lambda: os.getenv("FD_REDUNDANT_EXPERT_API_USER", ""), + "FD_REDUNDANT_EXPERT_API_PASSWORD": lambda: os.getenv("FD_REDUNDANT_EXPERT_API_PASSWORD", ""), + "FD_REDUNDANT_EXPERT_EPLB_STRATEGY": lambda: os.getenv("FD_REDUNDANT_EXPERT_EPLB_STRATEGY", ""), + "FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL": lambda: int( + os.getenv("FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL", "10") + ), + "FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB": lambda: int( + os.getenv("FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB", "0") + ), + "FD_REDUNDANT_EXPERT_ENABLE_SCHEDULE_CORDON": lambda: int( + os.getenv("FD_REDUNDANT_EXPERT_ENABLE_SCHEDULE_CORDON", "1") + ) + == 1, + "FD_MODEL_USE_SAFETENSORS": lambda: int(os.getenv("FD_MODEL_USE_SAFETENSORS", "1")) == 1, + "FD_MODEL_USE_OFFLINE_QUANT": lambda: int(os.getenv("FD_MODEL_USE_OFFLINE_QUANT", "1")) == 1, + "FD_MOE_QUANT_TYPE": lambda: os.getenv("FD_MOE_QUANT_TYPE", "w4a8"), "ENCODE_FEATURE_BOS_AK": lambda: os.getenv("ENCODE_FEATURE_BOS_AK"), "ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"), } diff --git a/fastdeploy/eplb/__init__.py b/fastdeploy/eplb/__init__.py new file mode 100644 index 000000000..49a2e7b35 --- /dev/null +++ b/fastdeploy/eplb/__init__.py @@ -0,0 +1,3 @@ +""" " +Expert Parallelism Load Balancer (EPLB) +""" diff --git a/fastdeploy/eplb/async_expert_loader.py b/fastdeploy/eplb/async_expert_loader.py new file mode 100644 index 000000000..cfffde97c --- /dev/null +++ b/fastdeploy/eplb/async_expert_loader.py @@ -0,0 +1,405 @@ +"""AsyncExpertLoader async load the model weights of the MoE experts.""" + +import ctypes +import os +import time +import traceback +from typing import List, Tuple + +import numpy as np +import paddle + +from fastdeploy import envs + +REARRANGE_EXPERT_MAGIC_NUM = 147183647 +REARRANGE_ORIGINATOR_EP_RANK = 0 +CHECK_TIME_INTERNAL = 3 +HTTP_RETRY_NUM = 5 +CHECK_TIMEOUT = 120 + + +libc = ctypes.CDLL(None) + +libc.mmap.argtypes = [ + ctypes.c_void_p, # void *addr + ctypes.c_size_t, # size_t length + ctypes.c_int, # int prot + ctypes.c_int, # int flags + ctypes.c_int, # int fd + ctypes.c_size_t, # off_t offset +] +libc.mmap.restype = ctypes.c_void_p +libc.munmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t] +libc.munmap.restype = ctypes.c_int + +PROT_READ = 0x1 +PROT_WRITE = 0x2 +MAP_SHARED = 0x01 +MAP_ANONYMOUS = 0x20 +MAP_FAILED = -1 + +G = 1024**3 +TOTAL_MODEL_SIZE = 350 +MAIN_MODEL_REDUNDANT_SHM_SIZE = 5 + +MODEL_MAIN_NAME = "eplb_main" + + +def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, logger=None): + """create_mmap""" + flags = MAP_SHARED + prot = PROT_READ | PROT_WRITE + + main_size = 0 + if envs.FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB == 0: + main_size = TOTAL_MODEL_SIZE // ep_size + else: + main_size = envs.FD_REDUNDANT_EXPERT_ASYNC_LOAD_MODEL_SHMEM_SIZE_GB + main_size = main_size * G + + mmap_infos = {} + + from cuda import cudart + + for name in model_name: + expert_weight_file = f"/dev/shm/{name}_rank_{ep_rank}_expert_weight_{shm_uuid}" + shm_size = main_size + + if not os.path.isfile(expert_weight_file): + open(expert_weight_file, "wb").close() + shm_fd = os.open(expert_weight_file, os.O_RDWR) + os.ftruncate(shm_fd, shm_size) + if logger is not None: + logger.info( + f"redundant_expert: create_mmap file {expert_weight_file}, \ + fd {shm_fd}, size {shm_size}" + ) + + shm_ptr = libc.mmap(0, ctypes.c_size_t(shm_size), prot, flags, shm_fd, 0) + if shm_ptr == MAP_FAILED: + raise OSError(f"redundant_expert: mmap {expert_weight_file} failed: {ctypes.get_errno()}") + + shm_ptr = ctypes.cast(shm_ptr, ctypes.POINTER(ctypes.c_int8)) + addr = ctypes.addressof(shm_ptr.contents) + + # Register memory with CUDA + (ret,) = cudart.cudaHostRegister(addr, shm_size, 0) + if ret != cudart.cudaError_t.cudaSuccess: + raise RuntimeError( + f"cudaHostRegister failed: {cudart.cudaGetErrorString(ret)}," + + f" address {hex(addr)} size {shm_size}, ret: {ret}" + ) + + mmap_infos[name] = shm_ptr + + return mmap_infos + + +def save_tensor_to_shm_mem(cached_weights, file_path, logger=None): + """save_tensor_to_shm_mem""" + tensor_infos = [] + offset = 0 + if not os.path.exists(file_path): + raise OSError("File is not exist") + + shm_size = os.path.getsize(file_path) + + for name, w in cached_weights: + size = w.numel().item() * w.element_size() + # logger.info(f"redundant_expert: save tensor to {name} offset: {offset} size: {size}") + w_ptr = ctypes.string_at(w.data_ptr(), size) + with open(file_path, "r+b") as file: + file.seek(offset) + if offset + size > shm_size: + raise IOError( + f"redundant_expert: Exceeded {file_path} file's size. " + + "Should set a bigger value using env variable." + ) + n = file.write(w_ptr) + assert n == size + tensor_infos.append((name, offset, size, w.shape, w.dtype)) + + offset += size + + sz = offset / 1024 / 1024 / 1024 + if logger is not None: + logger.info(f"redundant_expert: save_tensor_to_shm_mem success. file {file_path} size {sz}G") + + return tensor_infos + + +def load_tensor_from_shm_mem(tensor_infos, shm_ptr, logger=None): + """load_tensor_from_shm_mem""" + # weights_dict = {} + weights_dict = [] + for name, offset, size, shape, dtype in tensor_infos: + # 计算共享内存中张量的地址 + w_addr = ctypes.cast(shm_ptr, ctypes.c_void_p).value + offset + w_ptr = ctypes.cast(w_addr, ctypes.POINTER(ctypes.c_byte)) + # 先读取为字节数组,再通过视图转换成适当类型 + np_array = np.ctypeslib.as_array(w_ptr, shape=(size,)) + + if dtype == paddle.float32: + tmp = np_array.view(np.float32) + tensor = paddle.Tensor(tmp, dtype=paddle.float32, place=paddle.CPUPlace(), zero_copy=True) + elif dtype == paddle.uint8: + tmp = np_array.view(np.uint8) + tensor = paddle.Tensor(tmp, dtype=paddle.uint8, place=paddle.CPUPlace(), zero_copy=True) + elif dtype == paddle.int8: + tmp = np_array.view(np.int8) + tensor = paddle.Tensor(tmp, dtype=paddle.int8, place=paddle.CPUPlace(), zero_copy=True) + elif dtype == paddle.bfloat16: + # NumPy 不支持 bfloat16,因此先以 uint16 读取原始数据,再用 Paddle cast 为 bfloat16 + tmp = np_array.view(np.uint16) + tensor = paddle.Tensor(tmp, dtype=paddle.bfloat16, place=paddle.CPUPlace(), zero_copy=True) + else: + raise TypeError(f"Unsupported dtype: {dtype}") + + assert w_addr == tensor.data_ptr() + # weights_dict[name] = tensor.view(shape) + weights_dict.append((name, tensor.view(shape))) + + if logger is not None: + logger.info("redundant_expert: load_tensor_from_shm_mem succ") + return weights_dict + + +class AsyncEPLoader(object): + """Aynsc Expert loader""" + + def __init__( + self, + model_dir, + rank=8, + expert_per_rank=8, + moe_layer_start_index=3, + moe_quant_type="", + logger=None, + ): + """ + __init__ + """ + self.model_path = model_dir + + self.expert_per_rank = expert_per_rank + self.moe_layer_start_index = moe_layer_start_index + self.ep_rank = rank + self.moe_quant_type = moe_quant_type + + self.old_model_ep_rank_to_expert_id_list = None + self.new_model_ep_rank_to_expert_id_list = None + + self.cached_weights = [] + # self.state_dicts = {} + self.moe_file_names = [] + + self.logger = logger + + def reset(self): + """ + reset + """ + self.old_model_ep_rank_to_expert_id_list = None + self.new_model_ep_rank_to_expert_id_list = None + self.cached_weights = [] + self.moe_file_names = [] + + def load_experts_weight_from_disk(self): + """ + return value: (all_succ whether_load_weight exist_fatal_error message), + exist_fatal_error means all rank need restart + """ + ep_rank = self.ep_rank + start_idx = ep_rank * self.expert_per_rank + end_idx = start_idx + self.expert_per_rank + try: + old_expert_ids_all = self.old_model_ep_rank_to_expert_id_list[:, start_idx:end_idx] + new_expert_ids_all = self.new_model_ep_rank_to_expert_id_list[:, start_idx:end_idx] + need_to_reload = list() + for layer_id in range(len(old_expert_ids_all)): + if layer_id < self.moe_layer_start_index: + continue + new_expert_ids = new_expert_ids_all[layer_id] + old_expert_ids = old_expert_ids_all[layer_id] + if len(new_expert_ids) != len(old_expert_ids): + message = f"redundant_expert: new_expert_ids length not equal to old_expert_ids \ + length layer_id: {layer_id}" + # this is very dangerous and unepxpected, should be fixed + return False, message + # TODO: 按需加载,过滤重复专家 + self.logger.info( + f"redundant_expert: rank {ep_rank} layer {layer_id} old_experts {old_expert_ids}" + + f" new_experts {new_expert_ids}" + ) + need_to_reload.extend([(layer_id, expert_id) for expert_id in new_expert_ids]) + + succ = True + message = "" + if len(need_to_reload) > 0: + if envs.FD_MODEL_USE_SAFETENSORS: + succ, message = self.load_safetensor_fp8_from_disk(need_to_reload) + else: + succ, message = self.load_weight_bf16_from_disk(need_to_reload) + if not succ: + self.logger.info( + f"redundant_expert: load_experts_weight_from_disk fail. rank {ep_rank}, error: {message}" + ) + new_message = f"redundant_expert: load_experts_weight_from_disk fail. rank {ep_rank}, error: {message}" + return False, new_message + self.logger.info(f"redundant_expert: load_experts_weight_from_disk success. rank {ep_rank}") + return True, "redundant_expert: load_experts_weight_from_disk success" + except Exception as e: + message = f"redundant_expert: Failed to load_experts_weight_from_disk ep_rank {ep_rank} excep: {e}" + error_message = traceback.format_exc() + self.logger.error(f"redundant_expert: message: {message} traceback: {error_message}") + return False, message + + def load_weight_bf16_from_disk(self, need_to_reload: List[Tuple[int, int]]): + """load_weight_bf16_from_disk""" + try: + ckpt_up_gate_proj_name = "up_gate_proj" + ckpt_down_proj_name = "down_proj" + for layer_id, expert_id in need_to_reload: + for weight_name in [ckpt_up_gate_proj_name, ckpt_down_proj_name]: + ckpt_file_name = f"ernie.layers.{layer_id}.mlp.experts.{expert_id}.{weight_name}.weight" + if ckpt_file_name not in self.moe_file_names: + self.logger.info(f"record redundant_expert: {ckpt_file_name}") + self.moe_file_names.append(ckpt_file_name) + + last_device = paddle.device.get_device() + paddle.set_device("cpu") + + for file_name in self.moe_file_names: + # 判断文件是否存在 + if not os.path.exists(self.model_path + "/merged_tp1_state_split/" + file_name): + # self.logger.info(f"redundant_expert: {file_name} not exist.") + continue + # self.logger.info(f"redundant_expert: Loading expert weights: {file_name}.") + self.state_dicts[file_name] = paddle.load(self.model_path + "/merged_tp1_state_split/" + file_name) + + paddle.set_device(last_device) + self.logger.info("redundant_expert: Loading expert weights end.") + return True, "redundant_expert: Succeeded to loading expert weights." + except Exception as e: + message = f"redundant_expert: Failed to get weights iterator: {e}." + return False, message + + def load_safetensor_fp8_from_disk(self, need_to_reload: List[Tuple[int, int]]): + """load_safetensor_fp8_from_disk""" + """ + ernie.layers.52.mlp.experts.58.up_gate_proj.quant_weight + ernie.layers.52.mlp.experts.58.up_gate_proj.weight_scale + ernie.layers.52.mlp.experts.58.down_proj.quant_weight + ernie.layers.52.mlp.experts.58.down_proj.weight_scale + """ + up_gate_down = ["up_gate_proj", "down_proj"] + quant_weight_scale = ["quant_weight", "weight_scale"] + if self.moe_quant_type == "w4a8": + quant_weight_scale = ["quant_weight"] + ckpt_name = [ + (f"ernie.layers.{layer_id}.mlp.experts.{expert_id}.{proj_name}.{quant_name}") + for layer_id, expert_id in need_to_reload + for proj_name in up_gate_down + for quant_name in quant_weight_scale + ] + ckpt_name_to_safetensor_file = load_ep_checkpoint(self.model_path) + hf_weights_files = list(set(ckpt_name_to_safetensor_file.values())) + state_dicts = {} + + last_device = paddle.device.get_device() + paddle.set_device("cpu") + + from safetensors import safe_open + + for st_file in hf_weights_files: + with safe_open(st_file, framework="np", device="cpu") as f: + for name in f.keys(): + if name in ckpt_name: + weight = f.get_tensor(name) + state_dicts[name] = paddle.Tensor(weight, zero_copy=True) + weights_list = [] + for name in ckpt_name: + weights_list.append((name, state_dicts[name])) + self.cached_weights = weights_list + + paddle.set_device(last_device) + return True, "load_expert_weight_from_disk_safetensor success" + + +def load_ep_checkpoint(model_path): + """ + load ep checkpoint + """ + file_path = os.path.join(model_path, "model.safetensors.index.json") + if not os.path.exists(file_path): + return {} + import json + + with open(file_path, "r") as f: + weight_map = json.load(f)["weight_map"] + state_dict = {k: os.path.join(model_path, v) for k, v in weight_map.items()} + return state_dict + + +def load_model_weights_process( + rank: int, expert_per_rank: int, moe_layer_start_index: int, moe_quant_type: str, data_conn, mg_conn, shm_uuid +): + """ + load_model_weights_process + """ + import faulthandler + + from setproctitle import setproctitle + + setproctitle(f"eplb::async_load_model_{rank}") + faulthandler.enable() + from server.utils import get_logger + + logger = get_logger("eplb_async_loader", "eplb_{0}.log".format(rank)) + logger.info("redundant_expert: load_model_weights_process start") + + paddle.set_device("cpu") + ep_loader = AsyncEPLoader( + rank=rank, + expert_per_rank=expert_per_rank, + moe_layer_start_index=moe_layer_start_index, + moe_quant_type=moe_quant_type, + logger=logger, + ) + + while True: + ep_loader.reset() + data = mg_conn.recv() + + result = True + weight_infos = [] + try: + ep_loader.old_model_ep_rank_to_expert_id_list = data["old_model_ep_rank_to_expert_id_list"] + ep_loader.new_model_ep_rank_to_expert_id_list = data["new_model_ep_rank_to_expert_id_list"] + + begin_time_disk = int(time.time()) + success, message = ep_loader.load_experts_weight_from_disk() + begin_time_shm = int(time.time()) + logger.info( + "redundant_expert: async load load_weight_from_disk, " + + f"succ {success}, cost {begin_time_shm-begin_time_disk}s" + ) + if success: + model_name = MODEL_MAIN_NAME + file_path = f"/dev/shm/{model_name}_rank_{rank}_expert_weight_{shm_uuid}" + weight_infos = save_tensor_to_shm_mem(ep_loader.cached_weights, file_path, logger) + logger.info( + "redundant_expert: async load save_tensor_to_shm_mem, " + + f"tensor nums {len(weight_infos)}, cost {int(time.time()-begin_time_shm)}s" + ) + else: + logger.error(f"redundant_expert: async load load_weight_from_disk failed, error {message}") + result = False + + except Exception as e: + logger.error(f"redundant_expert: async load weights failed, rank {rank} error {e}") + result = False + weight_infos = [] + finally: + request_data = {"result": result, "weights": weight_infos} + data_conn.send(request_data) diff --git a/fastdeploy/eplb/eplb.py b/fastdeploy/eplb/eplb.py new file mode 100644 index 000000000..f9c097eac --- /dev/null +++ b/fastdeploy/eplb/eplb.py @@ -0,0 +1,288 @@ +"""Expert Parallelism Load Balancer (EPLB)""" + +from typing import Tuple + +import numpy as np + + +def balanced_packing(weight: np.ndarray, num_packs: int) -> Tuple[np.ndarray, np.ndarray]: + """ + Pack n weighted objects to m packs, such that each bin contains exactly n/m objects and the weights of all packs + are as balanced as possible. + + Parameters: + weight: [X, n], the weight of each item + num_packs: number of packs + + Returns: + pack_index: [X, n], the pack index of each item + rank_in_pack: [X, n], the rank of the item in the pack + """ + num_layers, num_groups = weight.shape + assert num_groups % num_packs == 0 + groups_per_pack = num_groups // num_packs + + if groups_per_pack == 1: + pack_index = np.arange(weight.shape[-1], dtype=np.int32).reshape(1, -1).repeat(num_layers, axis=0) + rank_in_pack = np.zeros_like(weight, dtype=np.int32) + return pack_index, rank_in_pack + + indices = np.argsort(-weight.astype(np.float32), axis=-1) + pack_index = np.full_like(weight, fill_value=-1, dtype=np.int32) + rank_in_pack = np.full_like(pack_index, fill_value=-1) + for i in range(num_layers): + pack_weights = [0] * num_packs + pack_items = [0] * num_packs + for group in indices[i]: + pack = min( + (i for i in range(num_packs) if pack_items[i] < groups_per_pack), + key=pack_weights.__getitem__, + ) + assert pack_items[pack] < groups_per_pack + pack_index[i, group] = pack + rank_in_pack[i, group] = pack_items[pack] + pack_weights[pack] += weight[i, group] + pack_items[pack] += 1 + return pack_index, rank_in_pack + + +def replicate_experts(weight: np.ndarray, num_phy: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Replicate `num_log` experts to `num_phy` replicas, such that the maximum load of all replicas is minimized. + + Parameters: + weight: [X, num_log] + num_phy: total number of experts after replication + + Returns: + phy2log: [X, num_phy], logical expert id of each physical expert + rank: [X, num_phy], the replica rank + logcnt: [X, num_log], number of replicas for each logical expert + """ + n, num_log = weight.shape + num_redundant = num_phy - num_log + assert num_redundant >= 0 + phy2log = np.arange(num_phy, dtype=np.int32).reshape(1, -1).repeat(n, axis=0) + rank = np.zeros((n, num_phy), dtype=np.int32) + logcnt = np.ones((n, num_log), dtype=np.int32) + arangen = np.arange(n, dtype=np.int32) + for i in range(num_log, num_phy): + redundant_indices = np.argmax(weight / logcnt, axis=-1) + phy2log[:, i] = redundant_indices + rank[:, i] = logcnt[arangen, redundant_indices] + logcnt[arangen, redundant_indices] += 1 + return phy2log, rank, logcnt + + +def rebalance_experts_intra_node( + weight: np.ndarray, + num_physical_experts: int, + num_groups: int, + num_nodes: int, + num_gpus: int, +): + """ + Parameters: + weight: [num_moe_layers, num_logical_experts] + num_physical_experts: number of physical experts after replication + num_groups: number of expert groups + num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster + num_gpus: number of GPUs, must be a multiple of `num_nodes` + + Returns: + physical_to_logical_map: [num_moe_layers, num_physical_experts] + logical_to_physical_map: [num_moe_layers, num_logical_experts, X] + logical_count: [num_moe_layers, num_logical_experts] + """ + + num_layers, num_logical_experts = weight.shape + assert num_logical_experts % num_groups == 0 + num_redundant_experts = num_physical_experts - num_logical_experts + assert num_redundant_experts >= 0 + + assert num_gpus % num_nodes == 0 + num_gpus_per_node = num_gpus // num_nodes + + assert num_physical_experts % num_gpus == 0 + num_physical_experts_per_gpu = num_physical_experts // num_gpus + assert num_physical_experts % num_nodes == 0 + num_physical_experts_per_node = num_physical_experts // num_nodes + + assert num_logical_experts % num_physical_experts_per_node == 0 + # num_logical_nodes = num_logical_experts // num_physical_experts_per_node + assert num_redundant_experts % num_physical_experts_per_node == 0 + # num_redundant_nodes = num_redundant_experts // num_physical_experts_per_node + + def inverse(perm: np.ndarray) -> np.ndarray: + inv = np.empty_like(perm) + inv[np.arange(perm.shape[0])[:, None], perm] = np.arange(perm.shape[1], dtype=np.int32).reshape(1, -1) + return inv + + # Step 1: generate redundant experts by weight. + # shape of tmp2log, tmprank is [num_layers, num_physical_experts] + # shape of logcnt is [num_layers, num_logical_experts] + tmp2log, tmprank, logcnt = replicate_experts(weight, num_physical_experts) + + # Step 2: compute num_tokens of physical experts + # shape of tokens_per_tmp is [num_layers * num_nodes, num_physical_experts_per_node] + tokens_per_tmp = np.take_along_axis(weight / logcnt, tmp2log, axis=-1).reshape(-1, num_physical_experts_per_node) + + # STEP 3: take load balance of gpu cards in node + # shape of gpu_index, rank_in_gpu, tmp2phy, phy2tmp is [num_layers * num_nodes, num_physical_experts_per_node] + gpu_index, rank_in_gpu = balanced_packing(tokens_per_tmp, num_gpus_per_node) + tmp2phy = gpu_index * num_physical_experts_per_gpu + rank_in_gpu + phy2tmp = inverse(tmp2phy) + + # STEP 4: generate final phy2log mapping + tmp2log = tmp2log.reshape(-1, num_physical_experts_per_node) + tmprank = tmprank.reshape(-1, num_physical_experts_per_node) + phy2log = np.take_along_axis(tmp2log, phy2tmp, axis=-1).reshape(-1, num_physical_experts) + phyrank = np.take_along_axis(tmprank, phy2tmp, axis=-1).reshape(-1, num_physical_experts) + return phy2log, phyrank, logcnt + + +def rebalance_experts_hierarchical( + weight: np.ndarray, + num_physical_experts: int, + num_groups: int, + num_nodes: int, + num_gpus: int, +): + """ + Parameters: + weight: [num_moe_layers, num_logical_experts] + num_physical_experts: number of physical experts after replication + num_groups: number of expert groups + num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster + num_gpus: number of GPUs, must be a multiple of `num_nodes` + + Returns: + physical_to_logical_map: [num_moe_layers, num_physical_experts] + logical_to_physical_map: [num_moe_layers, num_logical_experts, X] + logical_count: [num_moe_layers, num_logical_experts] + """ + num_layers, num_logical_experts = weight.shape + assert num_logical_experts % num_groups == 0 + group_size = num_logical_experts // num_groups + assert num_groups % num_nodes == 0 + groups_per_node = num_groups // num_nodes + assert num_gpus % num_nodes == 0 + assert num_physical_experts % num_gpus == 0 + phy_experts_per_gpu = num_physical_experts // num_gpus + + def inverse(perm: np.ndarray) -> np.ndarray: + inv = np.empty_like(perm) + inv[np.arange(perm.shape[0])[:, None], perm] = np.arange(perm.shape[1], dtype=np.int32).reshape(1, -1) + return inv + + # Step 1: pack groups to nodes + tokens_per_group = weight.reshape(num_layers, num_groups, group_size).sum(axis=-1) + group_pack_index, group_rank_in_pack = balanced_packing(tokens_per_group, num_nodes) + log2mlog = ( + ((group_pack_index * groups_per_node + group_rank_in_pack) * group_size)[:, :, None] + + np.arange(group_size, dtype=np.int32) + ).reshape(num_layers, -1) + mlog2log = inverse(log2mlog) + + # Step 2: construct redundant experts within nodes + tokens_per_mlog = np.take_along_axis(weight, mlog2log, axis=-1).reshape(-1, num_logical_experts // num_nodes) + phy2mlog, phyrank, mlogcnt = replicate_experts(tokens_per_mlog, num_physical_experts // num_nodes) + + # Step 3: pack physical_experts to GPUs + tokens_per_phy = np.take_along_axis(tokens_per_mlog / mlogcnt, phy2mlog, axis=-1) + pack_index, rank_in_pack = balanced_packing(tokens_per_phy, num_gpus // num_nodes) + phy2pphy = pack_index * phy_experts_per_gpu + rank_in_pack + pphy2phy = inverse(phy2pphy) + + pphy2mlog = np.take_along_axis(phy2mlog, pphy2phy, axis=-1) # [num_layers * num_nodes, num_log_per_nodes] + pphy2mlog = ( + pphy2mlog.reshape(num_layers, num_nodes, -1) + + np.arange(0, num_logical_experts, num_logical_experts // num_nodes, dtype=np.int32).reshape(1, -1, 1) + ).reshape(num_layers, -1) + pphy2log = np.take_along_axis(mlog2log, pphy2mlog, axis=-1) + pphyrank = np.take_along_axis(phyrank, pphy2phy, axis=-1).reshape(num_layers, -1) + logcnt = np.take_along_axis(mlogcnt.reshape(num_layers, -1), log2mlog, axis=-1) + return pphy2log, pphyrank, logcnt + + +def rebalance_experts( + weight: np.ndarray, + num_replicas: int, + num_groups: int, + num_nodes: int, + num_gpus: int, + eplb_strategy: str = "", +) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Entry point for expert-parallelism load balancer. + + Parameters: + weight: [layers, num_logical_experts], the load statistics for all logical experts + num_replicas: number of physical experts, must be a multiple of `num_gpus` + num_groups: number of expert groups + num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster + num_gpus: number of GPUs, must be a multiple of `num_nodes` + + Returns: + physical_to_logical_map: [layers, num_replicas], the expert index of each replica + logical_to_physical_map: [layers, num_logical_experts, X], the replica indices for each expert + expert_count: [layers, num_logical_experts], number of physical replicas for each logical expert + """ + num_layers, num_logical_experts = weight.shape + weight = weight.astype(np.float32) + if eplb_strategy == "balance_intra_node": + phy2log, phyrank, logcnt = rebalance_experts_intra_node(weight, num_replicas, num_groups, num_nodes, num_gpus) + else: + if num_groups % num_nodes == 0: + # use hierarchical load-balance policy + phy2log, phyrank, logcnt = rebalance_experts_hierarchical( + weight, num_replicas, num_groups, num_nodes, num_gpus + ) + else: + # use global load-balance policy + phy2log, phyrank, logcnt = replicate_experts(weight, num_replicas) + maxlogcnt = logcnt.max() + log2phy = np.full((num_layers, num_logical_experts, maxlogcnt), -1, dtype=np.int32) + np.put_along_axis( + log2phy.reshape(num_layers, -1)[:, :, None], + (phy2log * maxlogcnt + phyrank)[:, :, None], + np.arange(num_replicas, dtype=np.int32).reshape(1, -1).repeat(num_layers, axis=0)[:, :, None], + axis=1, + ) + return phy2log, log2phy, logcnt + + +__all__ = ["rebalance_experts"] + + +def main(): + """ + main + """ + num_hidden_layers = 3 + num_expert = 64 + num_groups = 8 + + num_replicas = 64 + num_nodes = 4 + num_gpus = 4 * 8 + + # model_tokens_per_expert_stats_list = np.ones( + # (num_hidden_layers, num_expert), dtype=int) + + model_tokens_per_expert_stats_list = np.random.randint(low=1, high=10, size=(num_hidden_layers, num_expert)) + + phy2log, phyrank, logcnt = rebalance_experts( + model_tokens_per_expert_stats_list, + num_replicas, + num_groups, + num_nodes, + num_gpus, + ) + print(phy2log) + print(phyrank) + print(logcnt) + + +if __name__ == "__main__": + main() diff --git a/fastdeploy/eplb/experts_manager.py b/fastdeploy/eplb/experts_manager.py new file mode 100644 index 000000000..cad2e92be --- /dev/null +++ b/fastdeploy/eplb/experts_manager.py @@ -0,0 +1,602 @@ +""" +redundant expert manger +""" + +import threading +import time +from http import HTTPStatus +from multiprocessing import Pipe, Process, shared_memory + +import numpy as np +import requests + +from fastdeploy.eplb.async_expert_loader import load_model_weights_process +from fastdeploy.eplb.eplb import rebalance_experts +from fastdeploy.eplb.utils import RearrangeExpertState, RedundantExpertWorkload +from fastdeploy.utils import envs, get_logger + + +class RedundantExpertManager: + """ + RedundantExpertManger + """ + + def __init__(self, rank=0, ep_size=32, fd_config=None): + self.logger = get_logger("eplb_expert_manager", "eplb_{0}.log".format(rank)) + + self.rank = rank + self.ep_size = ep_size + self.fd_config = fd_config + self.eplb_config = fd_config.eplb_config + self.api_user = self.eplb_config.redundant_expert_api_user + self.api_passwd = self.eplb_config.redundant_expert_api_password + self.num_hidden_layers = self.eplb_config.model_config.num_layers + self.num_logical_experts = self.eplb_config.model_config.moe_num_experts + self.num_redundant_experts = self.eplb_config.redundant_experts_num + + self.num_replicas = self.num_logical_experts + self.num_redundant_experts + self.num_groups = self.num_logical_experts + self.num_nodes = max(ep_size // 8, 1) + self.num_gpus = ep_size + self.expert_per_rank = self.num_replicas // ep_size + assert ( + self.num_replicas % ep_size == 0 + ), f"num_replicas must be divisible by ep_size, \ + but got num_replicas = {self.num_replicas}, ep_size = {ep_size}" + + self.model_ep_rank_to_expert_id_list = np.full( + ( + self.num_hidden_layers, + self.num_logical_experts + self.num_redundant_experts, + ), + -1, + dtype=np.int32, + ) + self.model_expert_id_to_ep_rank_array = np.full( + ( + self.num_hidden_layers, + self.num_logical_experts, + self.num_redundant_experts + 1, + ), + -1, + dtype=np.int32, + ) + self.model_expert_in_rank_num_list = np.zeros( + (self.num_hidden_layers, self.num_logical_experts), dtype=np.int32 + ) + + # backup info + self.last_model_ep_rank_to_expert_id_list = np.full( + ( + self.num_hidden_layers, + self.num_logical_experts + self.num_redundant_experts, + ), + -1, + dtype=np.int32, + ) + self.last_model_expert_id_to_ep_rank_array = np.full( + ( + self.num_hidden_layers, + self.num_logical_experts, + self.num_redundant_experts + 1, + ), + -1, + dtype=np.int32, + ) + self.last_model_expert_in_rank_num_list = np.zeros( + (self.num_hidden_layers, self.num_logical_experts), dtype=np.int32 + ) + + self.model_tokens_per_expert_stats_list = np.ones( + (self.num_hidden_layers, self.num_logical_experts), dtype=np.int32 + ) + self.caculate_expert_rank_table(True) + + self.dp_rank_address = None + self.need_allgather_load_weight_result = False + self.load_weight_begin_ts = 0 + self.load_weight_timeout = 300 # 5min + self.need_rearrange_expert = False + self.need_update_expert_tokens_stat = True + self.http_timeout = 1 + # 重置重排状态: 'done' -> 'free' + self.rearrange_end_ts = 0 + self.rearrange_reset_interval = 300 + + self.tensor_infos = None + + self.parent_data_conn, child_data_conn = Pipe() + self.parent_mg_conn, child_mg_conn = Pipe() + Process( + target=load_model_weights_process, + name=f"eplb::async_load_model_{rank}", + args=( + self.rank, + self.expert_per_rank, + self.fd_config.model_config.moe_layer_start_index, + self.eplb_config.moe_quant_type, + child_data_conn, + child_mg_conn, + ), + ).start() + child_data_conn.close() + child_mg_conn.close() + + listen_signal_thread = threading.Thread(target=self.listen_rearrange_expert_signal, args=(), daemon=True) + listen_signal_thread.start() + + self.logger.info( + f"redundant_expert: RedundantExpertManager init success, rank {rank}, \ + strategy {self.eplb_config.redundant_expert_eplb_strategy}" + ) + + def get_unique_name(self, name): + return f"{envs.get_unique_name(name + '_dprank_' + str(self.rank))}" + + def get_ep_rank_to_expert_id_list(self): + """ + get_ep_rank_to_expert_id_list + """ + return ( + self.model_ep_rank_to_expert_id_list, + self.model_expert_id_to_ep_rank_array, + self.model_expert_in_rank_num_list, + ) + + def listen_rearrange_expert_signal(self): + """ + listen_rearrange_expert_signal + """ + if self.rank == 0: + rearrange_experts_ips_size = np.zeros([1], dtype=np.int32) + shm_rearrange_experts_ips_size = shared_memory.SharedMemory( + create=False, + size=rearrange_experts_ips_size.nbytes, + name=self.get_unique_name("rearrange_experts_ips_size"), + ) + rearrange_experts_ips_size_array = np.ndarray( + rearrange_experts_ips_size.shape, + dtype=rearrange_experts_ips_size.dtype, + buffer=shm_rearrange_experts_ips_size.buf, + ) + shm_rearrange_experts_ips_list = shared_memory.SharedMemory( + create=False, + size=1024, + name=self.get_unique_name("rearrange_experts_ips_list"), + ) + + rearrange_experts_status = np.zeros([1], dtype=np.int32) + shm_rearrange_experts_status = shared_memory.SharedMemory( + create=False, + size=rearrange_experts_status.nbytes, + name=self.get_unique_name("rearrange_experts_status"), + ) + rearrange_experts_status_array = np.ndarray( + rearrange_experts_status.shape, + dtype=rearrange_experts_status.dtype, + buffer=shm_rearrange_experts_status.buf, + ) + + signal_update_weight_from_disk = np.zeros([1], dtype=np.int32) + shm_signal_update_weight_from_disk = shared_memory.SharedMemory( + create=False, + size=signal_update_weight_from_disk.nbytes, + name=self.get_unique_name("signal_update_weight_from_disk"), + ) + signal_update_weight_from_disk_array = np.ndarray( + signal_update_weight_from_disk.shape, + dtype=signal_update_weight_from_disk.dtype, + buffer=shm_signal_update_weight_from_disk.buf, + ) + + experts_token_stats = np.zeros((self.num_hidden_layers, 64), dtype=np.int32) + shm_all_experts_token_stats = shared_memory.SharedMemory( + create=False, + size=experts_token_stats.nbytes, + name=self.get_unique_name("all_experts_token_stats"), + ) + + while True: + if self.rank == 0: + now = int(time.time()) + if rearrange_experts_ips_size_array[0] > 0: + # step 1. all reduce experts token stats + address = bytes(shm_rearrange_experts_ips_list.buf[: rearrange_experts_ips_size_array[0]]).decode( + "utf-8" + ) + self.logger.info(f"redundant_expert: all rank ips {address}") + rearrange_experts_ips_size_array[0] = 0 + rearrange_experts_status_array[0] = RearrangeExpertState.doing.value + + self.dp_rank_address = address.strip().split(";") + if self.allreduce_experts_stat(): + self.need_allgather_load_weight_result = True + self.load_weight_begin_ts = now + self.logger.info("redundant_expert: all-reduce experts stats success") + else: + rearrange_experts_status_array[0] = RearrangeExpertState.free.value + self.logger.warning("redundant_expert: all-reduce experts stats fail") + elif self.need_allgather_load_weight_result and self.allreduce_load_weight_result(): + # step 3. all reduce the result of load weight from disk + self.need_allgather_load_weight_result = False + rearrange_experts_status_array[0] = RearrangeExpertState.load_succ.value + self.rearrange_end_ts = now + if rearrange_experts_status_array[0] > 1 and ( + now - self.rearrange_end_ts > self.rearrange_reset_interval + ): + # reset rearrange status + rearrange_experts_status_array[0] = RearrangeExpertState.free.value + + if signal_update_weight_from_disk_array[0] == 1: + # step 2. async load weight: disk -> memory + expert_token_stats = np.ndarray( + experts_token_stats.shape, + dtype=experts_token_stats.dtype, + buffer=shm_all_experts_token_stats.buf, + ) + self.model_tokens_per_expert_stats_list[:] = expert_token_stats[:] + self.caculate_expert_rank_table() + self.update_weight_from_disk() + signal_update_weight_from_disk_array[0] = 0 + time.sleep(0.5) + + def caculate_expert_rank_table(self, is_init=False): + """ + caculate_expert_rank_table + """ + num_groups = self.num_groups + num_nodes = self.num_nodes + num_gpus = self.num_gpus + eplb_strategy = self.eplb_config.redundant_expert_eplb_strategy + if is_init: + num_groups = 1 + num_nodes = 2 + num_gpus = 2 * 8 + eplb_strategy = "" + # eplb + rank_expert_list, logical_to_physical_map, expert_count = rebalance_experts( + self.model_tokens_per_expert_stats_list, + self.num_replicas, + num_groups, + num_nodes, + num_gpus, + eplb_strategy, + ) + + # backup info + self.last_model_ep_rank_to_expert_id_list[:] = self.model_ep_rank_to_expert_id_list[:] + self.last_model_expert_id_to_ep_rank_array[:] = self.model_expert_id_to_ep_rank_array[:] + self.last_model_expert_in_rank_num_list[:] = self.model_expert_in_rank_num_list[:] + + # update model info + self.model_ep_rank_to_expert_id_list[:] = rank_expert_list[:] + self.model_expert_id_to_ep_rank_array.fill(-1) + self.model_expert_id_to_ep_rank_array[..., : logical_to_physical_map.shape[-1]] = logical_to_physical_map[:] + self.model_expert_in_rank_num_list[:] = expert_count[:] + + if self.rank == 0: + workload = RedundantExpertWorkload() + workload.tokens_per_expert_stats_list = self.model_tokens_per_expert_stats_list.tolist() + workload.ep_rank_to_expert_id_list = rank_expert_list.tolist() + workload.expert_id_to_ep_rank_array = logical_to_physical_map.tolist() + workload.expert_in_rank_num_list = expert_count.tolist() + self.logger.info(workload.dump()) + + def update_weight_from_disk(self): + """ + update_weight_from_disk + """ + begin_time = time.time() + result_update_weight_from_disk = np.zeros([1], dtype=np.int32) + shm_result_update_weight_from_disk = shared_memory.SharedMemory( + create=False, + size=result_update_weight_from_disk.nbytes, + name=self.get_unique_name("result_update_weight_from_disk"), + ) + result_update_weight_from_disk_array = np.ndarray( + result_update_weight_from_disk.shape, + dtype=result_update_weight_from_disk.dtype, + buffer=shm_result_update_weight_from_disk.buf, + ) + result_update_weight_from_disk_array[0] = 0 + + self.logger.info(f"redundant_expert: update_weight_from_disk send to async process, rank {self.rank}") + self.parent_mg_conn.send( + { + "old_model_ep_rank_to_expert_id_list": self.last_model_ep_rank_to_expert_id_list, + "new_model_ep_rank_to_expert_id_list": self.model_ep_rank_to_expert_id_list, + } + ) + self.logger.info(f"redundant_expert: update_weight_from_disk recv from async process, rank {self.rank}") + response = self.parent_data_conn.recv() + self.tensor_infos = response["weights"] + + # 更新权重加载结果 + result_update_weight_from_disk_array[0] = 1 if response["result"] else -1 + self.logger.info( + "redundant_expert: update_weight_from_disk end, rank" + + f" {self.rank} {response['result']}, cost {int(time.time() - begin_time)}s" + ) + + def allreduce_experts_stat(self): + """ + 专家负载 + """ + if not self.allgather_expert_token_stats(): + return False + return self.broadcast_expert_token_stats() + + def allgather_expert_token_stats(self): + """ + allgather_expert_token_stats + """ + success_count = 0 + expert_token_stats = np.zeros((self.num_hidden_layers, self.num_logical_experts), dtype=np.int32) + for addr in self.dp_rank_address: + try: + # TODO: 请求失败重试 + params = {"user": self.api_user, "passwd": self.api_passwd} + res = requests.post( + f"http://{addr}/get_per_expert_tokens_stats", + json=params, + timeout=self.http_timeout, + ) + if res.status_code != HTTPStatus.OK: + self.logger.warning( + "redundant_expert: allgather_expert_token_stats fail. " + + f"addr {addr}, res {res.status_code} {res.json()}" + ) + break + success_count += 1 + expert_token_stats += np.array(res.json()["data"], dtype=np.int32) + except Exception as e: + self.logger.error(f"redundant_expert: allgather_expert_token_stats fail. addr {addr}, error {e}") + if success_count == len(self.dp_rank_address): + self.need_rearrange_expert = True + self.model_tokens_per_expert_stats_list[:] = expert_token_stats[:] + self.logger.info("redundant_expert: allgather_expert_token_stats success") + return True + self.logger.info( + "redundant_expert: allgather_expert_token_stats fail. " + + f"succ {success_count} total {len(self.dp_rank_address)}" + ) + return False + + def broadcast_expert_token_stats(self): + """ + broadcast_expert_token_stats + """ + success_count = 0 + for addr in self.dp_rank_address: + try: + params = { + "user": self.api_user, + "passwd": self.api_passwd, + "action": "recv_expert_weight", + "data": self.model_tokens_per_expert_stats_list.tolist(), + } + res = requests.post( + f"http://{addr}/rearrange_experts", + json=params, + timeout=self.http_timeout, + ) + if res.status_code != HTTPStatus.OK: + self.logger.warning( + "redundant_expert: broadcast_expert_token_stats fail. " + + f"addr {addr}, res {res.status_code} {res.json()}" + ) + break + success_count += 1 + except Exception as e: + self.logger.error( + f"redundant_expert: broadcast_expert_token_stats request fail. addr {addr}, error {e}" + ) + if success_count == len(self.dp_rank_address): + self.logger.info("redundant_expert: broadcast_expert_token_stats success") + return True + self.logger.info( + "redundant_expert: broadcast_expert_token_stats failed, " + + f"succ {success_count} total {len(self.dp_rank_address)}" + ) + return False + + def allreduce_load_weight_result(self): + """ + 权重加载结果 + """ + if int(time.time()) - self.load_weight_begin_ts > self.load_weight_timeout: + self.logger.info(f"redundant_expert: allreduce_load_weight_result timeout {self.load_weight_timeout}s") + return True + + all_success, exist_fail = self.allgather_load_weight_result() + if exist_fail: + # 如果有DP权重加载异常,结束本次重排 + self.logger.warning("redundant_expert: allreduce_load_weight_result exist fail, terminate this rearrange") + return True + if not all_success: + self.logger.info("redundant_expert: allreduce_load_weight_result waiting") + return False + # self.broadcast_load_weight_success() + if not exist_fail and all_success: + # prefill需要等待调度屏蔽 + if ( + self.fd_config.splitwise_role == "decode" + or not self.eplb_config.redundant_expert_enable_schedule_cordon + ): + self.logger.info("redundant_expert: allreduce_load_weight_result success, notify infer.py") + signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32) + shm_signal_update_weight_from_tensor = shared_memory.SharedMemory( + create=False, + size=signal_update_weight_from_tensor.nbytes, + name=self.get_unique_name("signal_update_weight_from_tensor"), + ) + signal_update_weight_from_tensor_array = np.ndarray( + signal_update_weight_from_tensor.shape, + dtype=signal_update_weight_from_tensor.dtype, + buffer=shm_signal_update_weight_from_tensor.buf, + ) + signal_update_weight_from_tensor_array[0] = 1 + return True + + def allgather_load_weight_result(self): + """ + allgather_load_weight_result + """ + all_success, exist_fail = False, False + + success_count, fail_count = 0, 0 + for addr in self.dp_rank_address: + try: + params = { + "user": self.api_user, + "passwd": self.api_passwd, + "action": "check_load_weight_result", + } + res = requests.post( + f"http://{addr}/check_redundant", + json=params, + timeout=self.http_timeout, + ) + if res.status_code != HTTPStatus.OK: + self.logger.warning( + "redundant_expert: allgather_load_weight_result fail. " + + f"addr {addr}, res {res.status_code} {res.json()}" + ) + break + result = res.json()["data"] + self.logger.info( + f"redundant_expert: allgather_load_weight_result success. addr {addr}, result {result}" + ) + if result == 1: + success_count += 1 + elif result == -1: + fail_count += 1 + self.logger.error( + f"redundant_expert: allgather_load_weight_result fail. addr {addr}, result {result}" + ) + exist_fail = True + except Exception as e: + self.logger.error(f"redundant_expert: allgather_load_weight_result error. addr {addr}, error {e}") + if success_count == len(self.dp_rank_address): + self.logger.info("redundant_expert: allgather_load_weight_result all success") + all_success = True + else: + self.logger.info( + "redundant_expert: allgather_load_weight_result not all ready, " + + f"succ {success_count} fail {fail_count} total {len(self.dp_rank_address)}" + ) + return all_success, exist_fail + + +def init_shared_memory_for_eplb_rank0(rank): + rearrange_experts_ips_size = np.zeros([1], dtype=np.int32) + shm_rearrange_experts_ips_size = shared_memory.SharedMemory( + create=True, + size=rearrange_experts_ips_size.nbytes, + name=f"{envs.get_unique_name('rearrange_experts_ips_size_dprank' + rank)}", + ) + rearrange_experts_ips_size_array = np.ndarray( + rearrange_experts_ips_size.shape, + dtype=rearrange_experts_ips_size.dtype, + buffer=shm_rearrange_experts_ips_size.buf, + ) + shm_rearrange_experts_ips_list = shared_memory.SharedMemory( + create=True, + size=envs.FD_REDUNDANT_EXPERT_IP_SHM_SIZE, + name=f"{envs.get_unique_name('rearrange_experts_ips_list_dprank' + rank)}", + ) + # 记录专家重排状态 + rearrange_experts_status = np.zeros([1], dtype=np.int32) + shm_rearrange_experts_status = shared_memory.SharedMemory( + create=True, + size=rearrange_experts_status.nbytes, + name=f"{envs.get_unique_name('rearrange_experts_status_dprank' + rank)}", + ) + rearrange_experts_status_array = np.ndarray( + rearrange_experts_status.shape, dtype=rearrange_experts_status.dtype, buffer=shm_rearrange_experts_status.buf + ) + # 接收更新权重的信号 + signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32) + shm_signal_update_weight_from_tensor = shared_memory.SharedMemory( + create=True, + size=signal_update_weight_from_tensor.nbytes, + name=f"{envs.get_unique_name('signal_update_weight_from_tensor_dprank' + rank) }", + ) + signal_update_weight_from_tensor_array = np.ndarray( + signal_update_weight_from_tensor.shape, + dtype=signal_update_weight_from_tensor.dtype, + buffer=shm_signal_update_weight_from_tensor.buf, + ) + return ( + rearrange_experts_ips_size_array, + shm_rearrange_experts_ips_list, + rearrange_experts_status_array, + signal_update_weight_from_tensor_array, + ) + + +def init_shared_memory_for_eplb_each_rank(fd_config, rank): + # 记录专家负载 + num_layers = fd_config.model_config.num_hidden_layers + num_experts = fd_config.model_config.moe_num_experts + experts_token_stats = np.zeros((num_layers, num_experts), dtype=np.int32) + shm_local_experts_token_stats = shared_memory.SharedMemory( + create=True, + size=experts_token_stats.nbytes, + name=f"{envs.get_unique_name('local_experts_token_stats_dprank' + rank)}", + ) + local_experts_token_stats_array = np.ndarray( + experts_token_stats.shape, dtype=experts_token_stats.dtype, buffer=shm_local_experts_token_stats.buf + ) + # TODO: 全局专家负载状态是一样的,节点上的所有DP可以共用一份,但需要避免多个DP同时更新 + shm_all_experts_token_stats = shared_memory.SharedMemory( + create=True, + size=experts_token_stats.nbytes, + name=f"{envs.get_unique_name('all_experts_token_stats_dprank' + rank)}", + ) + expert_tokens_stats_array = np.ndarray( + experts_token_stats.shape, dtype=experts_token_stats.dtype, buffer=shm_all_experts_token_stats.buf + ) + # 接收加载权重的信号 + signal_update_weight_from_disk = np.zeros([1], dtype=np.int32) + shm_signal_update_weight_from_disk = shared_memory.SharedMemory( + create=True, + size=signal_update_weight_from_disk.nbytes, + name=f"{envs.get_unique_name('signal_update_weight_from_disk_dprank' + rank)}", + ) + signal_update_weight_from_disk_array = np.ndarray( + signal_update_weight_from_disk.shape, + dtype=signal_update_weight_from_disk.dtype, + buffer=shm_signal_update_weight_from_disk.buf, + ) + # 记录加载权重的结果 + result_update_weight_from_disk = np.zeros([1], dtype=np.int32) + shm_result_update_weight_from_disk = shared_memory.SharedMemory( + create=True, + size=result_update_weight_from_disk.nbytes, + name=f"{envs.get_unique_name('result_update_weight_from_disk_dprank' + rank)}", + ) + result_update_weight_from_disk_array = np.ndarray( + result_update_weight_from_disk.shape, + dtype=result_update_weight_from_disk.dtype, + buffer=shm_result_update_weight_from_disk.buf, + ) + # 接收清零专家负载的信号 + signal_clear_experts_token_stats = np.zeros([1], dtype=np.int32) + shm_signal_clear_experts_token_stats = shared_memory.SharedMemory( + create=True, + size=signal_clear_experts_token_stats.nbytes, + name=f"{envs.get_unique_name('signal_clear_experts_token_stats_dprank' + rank)}", + ) + signal_clear_experts_token_stats_array = np.ndarray( + signal_clear_experts_token_stats.shape, + dtype=signal_clear_experts_token_stats.dtype, + buffer=shm_signal_clear_experts_token_stats.buf, + ) + return ( + local_experts_token_stats_array, + expert_tokens_stats_array, + signal_update_weight_from_disk_array, + result_update_weight_from_disk_array, + signal_clear_experts_token_stats_array, + ) diff --git a/fastdeploy/eplb/utils.py b/fastdeploy/eplb/utils.py new file mode 100644 index 000000000..48a8321bb --- /dev/null +++ b/fastdeploy/eplb/utils.py @@ -0,0 +1,60 @@ +"""eplb utilities""" + +import json +import os +import time +from enum import Enum + + +class RedundantExpertWorkload: + """Redundant Expert Workload""" + + def __init__(self, redundant_expert_meta_dir="/tmp/redundant_expert_meta"): + self.update_timestamp = time.time() + self.tokens_per_expert_stats_list = None + self.ep_rank_to_expert_id_list = None + self.expert_id_to_ep_rank_array = None + self.expert_in_rank_num_list = None + self.cost_milliseconds = 0 + self.meta_file_name = f"{redundant_expert_meta_dir}/rearrange-experts.json" + if not os.path.exists(redundant_expert_meta_dir): + os.makedirs(redundant_expert_meta_dir, exist_ok=True) + + def __json__(self): + return self.__dict__ + + def dump(self): + """Dump the object to a JSON file.""" + begin = time.time() + try: + with open(self.meta_file_name, "w") as fout: + json.dump(self.__dict__, fout) + except Exception as e: + return f"redundant_expert: dump expert workload failed, {e}" + cost_time = int((time.time() - begin) * 1000 * 1000) + return f"redundant_expert: dump expert workload result in {cost_time} us" + + def load(self): + """Load the object from a JSON file.""" + if not os.path.exists(self.meta_file_name): + return {}, f"redundant_expert: file {self.meta_file_name} is not exists" + try: + with open(self.meta_file_name, "r") as fin: + meta = json.load(fin) + self.__dict__.update(meta) + return self.__json__(), "ok" + except Exception as e: + return {}, f"redundant_expert: load file {self.meta_file_name} failed, {e}" + + +class RearrangeExpertState(Enum): + """RearrangeExpertState""" + + free = 0 + doing = 1 + load_succ = 2 # load weight from disk success + done = 3 + + +if __name__ == "__main__": + print(RedundantExpertWorkload("/tmp").load()) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index adcc24f41..4f492db96 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -18,6 +18,7 @@ import argparse import json import os import time +from multiprocessing import shared_memory from typing import Tuple import numpy as np @@ -30,6 +31,7 @@ from fastdeploy.config import ( CacheConfig, DeviceConfig, EarlyStopConfig, + EPLBConfig, ErnieArchitectures, FDConfig, GraphOptimizationConfig, @@ -40,6 +42,14 @@ from fastdeploy.config import ( SpeculativeConfig, StructuredOutputsConfig, ) +from fastdeploy.eplb.async_expert_loader import ( + MODEL_MAIN_NAME, + REARRANGE_EXPERT_MAGIC_NUM, + create_mmap, + load_tensor_from_shm_mem, +) +from fastdeploy.eplb.experts_manager import RedundantExpertManager +from fastdeploy.eplb.utils import RearrangeExpertState from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue from fastdeploy.inter_communicator import ( ExistTaskStatus, @@ -147,6 +157,7 @@ class PaddleDisWorkerProc: self.parallel_config = fd_config.parallel_config self.cache_config = fd_config.cache_config self.scheduler_config = fd_config.scheduler_config + self.eplb_config = fd_config.eplb_config # TODO(gongshaotian): Use worker factory to get worker self.worker = get_worker(fd_config=fd_config, local_rank=self.local_rank, rank=self.ranks) @@ -245,6 +256,18 @@ class PaddleDisWorkerProc: create=False, ) + def update_weights_from_tensor(self, mmap_infos): + """ + update_weights_from_tensor + """ + state_dicts = load_tensor_from_shm_mem(self.experts_manager.tensor_infos, mmap_infos[MODEL_MAIN_NAME], logger) + rank_expert_list, logical_to_physical_map, expert_count = self.experts_manager.get_ep_rank_to_expert_id_list() + self.worker.get_model().redundant_table_manger.update_expert_rank_table( + rank_expert_list, logical_to_physical_map, expert_count + ) + # TO BE FIXED + self.worker.get_model().update_state_dict(state_dicts) + def _broadcast_model_weights_signal(self, src: int, group) -> int: model_weights_signal_tensor = paddle.full(shape=[1], fill_value=self.model_weights_signal[0], dtype="int32") paddle.distributed.broadcast(model_weights_signal_tensor, src=src, group=group) @@ -260,7 +283,63 @@ class PaddleDisWorkerProc: """Main event loop for Paddle Distributed Workers. TODO(gongshaotian): support remote calling of functions that control worker. """ + if self.eplb_config.enable_redundant_experts: + self.last_dump_expert_workload_ts = 0 + self.experts_manager = RedundantExpertManager( + rank=self.local_rank, ep_size=self.ranks, fd_config=self.fd_config + ) + num_layers = self.fd_config.model_config.num_hidden_layers + num_experts = self.fd_config.model_config.moe_num_experts + expert_token_stats = np.zeros((num_layers, num_experts), dtype=np.int32) + shm_local_experts_token_stats = shared_memory.SharedMemory( + create=False, + size=expert_token_stats.nbytes, + name=f"{envs.get_unique_name('local_experts_token_stats_dprank' + self.local_rank)}", + ) + expert_tokens_stats_array = np.ndarray( + expert_token_stats.shape, dtype=expert_token_stats.dtype, buffer=shm_local_experts_token_stats.buf + ) + signal_clear_experts_token_stats = np.zeros([1], dtype=np.int32) + shm_signal_clear_experts_token_stats = shared_memory.SharedMemory( + create=False, + size=signal_clear_experts_token_stats.nbytes, + name=f"{envs.get_unique_name('signal_clear_experts_token_stats_dprank' + self.local_rank)}", + ) + signal_clear_experts_token_stats_array = np.ndarray( + signal_clear_experts_token_stats.shape, + dtype=signal_clear_experts_token_stats.dtype, + buffer=shm_signal_clear_experts_token_stats.buf, + ) + if self.local_rank == 0: + signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32) + shm_signal_update_weight_from_tensor = shared_memory.SharedMemory( + create=False, + size=signal_update_weight_from_tensor.nbytes, + name=f"{envs.get_unique_name('signal_update_weight_from_tensor_dprank' + self.local_rank)}", + ) + signal_update_weight_from_tensor_array = np.ndarray( + signal_update_weight_from_tensor.shape, + dtype=signal_update_weight_from_tensor.dtype, + buffer=shm_signal_update_weight_from_tensor.buf, + ) + rearrange_experts_status = np.zeros([1], dtype=np.int32) + shm_rearrange_experts_status = shared_memory.SharedMemory( + create=False, + size=rearrange_experts_status.nbytes, + name=f"{envs.get_unique_name('rearrange_experts_status_dprank' + self.local_rank)}", + ) + + rearrange_experts_status_array = np.ndarray( + rearrange_experts_status.shape, + dtype=rearrange_experts_status.dtype, + buffer=shm_rearrange_experts_status.buf, + ) + + expert_workload_dump_interval = envs.FD_REDUNDANT_EXPERT_DUMP_WORKLOAD_INTERVAL + mmap_infos = create_mmap( + [MODEL_MAIN_NAME], self.local_rank, self.ranks, shm_uuid=os.getenv("SHM_UUID", ""), logger=logger + ) # Currently, only support single node self.nnode = int((self.parallel_config.tensor_parallel_size + 7) // 8) req_ids = [] @@ -268,6 +347,44 @@ class PaddleDisWorkerProc: local_rank = self.local_rank % self.parallel_config.tensor_parallel_size self.model_weights_signal = np.zeros([1], dtype=np.int32) while True: + if self.eplb_config.enable_redundant_experts: + rearrange_time = time.time() + # 获取专家负载 + if expert_tokens_stats_array is not None and ( + int(rearrange_time) - self.last_dump_expert_workload_ts > expert_workload_dump_interval + ): + self.last_dump_expert_workload_ts = int(rearrange_time) + clear_stat = False + if signal_clear_experts_token_stats_array[0] == 1: + clear_stat = True + signal_clear_experts_token_stats_array[0] = 0 + ( + new_stats_array, + _, + _, + _, + ) = self.worker.get_model().redundant_table_manger.get_expert_tokens_stats(clear_stat=clear_stat) + expert_tokens_stats_array[:] = new_stats_array[:] + elif expert_tokens_stats_array is None: + logger.warning("redundant_expert: expert_tokens_stats_array not init") + + # 所有DP同步更新权重 + broadcast_value = 0 + if self.local_rank == 0 and signal_update_weight_from_tensor_array[0] == 1: + logger.info("redundant_expert: update_weight_from_tensor broadcast signal") + signal_update_weight_from_tensor_array[0] = 0 + broadcast_value = REARRANGE_EXPERT_MAGIC_NUM + data = paddle.to_tensor([broadcast_value]) + paddle.distributed.broadcast(data, 0) + if data[0] == REARRANGE_EXPERT_MAGIC_NUM: + self.update_weights_from_tensor(mmap_infos) + logger.info( + f"redundant_expert: update_weight_from_tensor success, cost {(time.time() - rearrange_time)*1000}ms" + ) + paddle.distributed.barrier() + if self.local_rank == 0: + rearrange_experts_status_array[0] = RearrangeExpertState.done.value + logger.info("redundant_expert: done") if self.local_rank % self.parallel_config.tensor_parallel_size == 0: if self.model_weights_status.value[0] != ModelWeightsStatus.NORMAL: self.model_weights_signal[0] = int(self.model_weights_status.value[0]) @@ -754,6 +871,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: plas_attention_config = PlasAttentionConfig(args.plas_attention_config) early_stop_config = EarlyStopConfig(args.early_stop_config) + eplb_config = EPLBConfig() structured_outputs_config: StructuredOutputsConfig = StructuredOutputsConfig(args=vars(args)) @@ -819,6 +937,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: ips=args.ips, plas_attention_config=plas_attention_config, structured_outputs_config=structured_outputs_config, + eplb_config=eplb_config, ) update_fd_config_for_mm(fd_config) if fd_config.load_config.load_choices == "default_v1" and not v1_loader_support(fd_config):