From 50be19a88a1f3b7fe4a6fe274bd76f7e1ea08f66 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Thu, 30 Oct 2025 09:50:49 +0800 Subject: [PATCH] [EP] fix several bugs in data parallel (#4657) * Simplify profiling block setup in expert_service.py Refactor profiling block initialization to avoid duplication. * Update common_engine.py --- fastdeploy/engine/common_engine.py | 3 ++- fastdeploy/engine/expert_service.py | 31 ++++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 3392b26d6..a2c984d4f 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -303,7 +303,8 @@ class EngineService: client_id=0, local_data_parallel_size=self.cfg.parallel_config.data_parallel_size, local_data_parallel_id=min( - self.cfg.worker_num_per_node * self.cfg.node_rank + self.cfg.parallel_config.local_data_parallel_id, + self.cfg.worker_num_per_node // self.cfg.parallel_config.tensor_parallel_size * self.cfg.node_rank + + self.cfg.parallel_config.local_data_parallel_id, self.cfg.parallel_config.data_parallel_size - 1, ), ) diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 6f567be09..5a0da7bc6 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -130,29 +130,28 @@ class ExpertService: create=False, ) self.launched_expert_service_signal.value[local_rank] = 1 + if self.do_profile: + get_profile_block_num = np.zeros([1], dtype=np.int32) + while True: + try: + self.get_profile_block_num_signal = IPCSignal( + name="get_profile_block_num", + array=get_profile_block_num, + dtype=np.int32, + suffix=int(self.cfg.parallel_config.engine_worker_queue_port[0]), + create=False, + ) + break + except: + time.sleep(1) + self.reset_kvcache_blocks() if self.cfg.scheduler_config.splitwise_role != "mixed" or self.cfg.cache_config.enable_prefix_caching: - if self.do_profile: - get_profile_block_num = np.zeros([1], dtype=np.int32) - while True: - try: - self.get_profile_block_num_signal = IPCSignal( - name="get_profile_block_num", - array=get_profile_block_num, - dtype=np.int32, - suffix=int(self.cfg.parallel_config.engine_worker_queue_port[0]), - create=False, - ) - break - except: - time.sleep(1) - self.reset_kvcache_blocks() ipc_signal_suffix_cache = self.cfg.parallel_config.engine_worker_queue_port[local_data_parallel_id] self.cache_manager_processes = self.engine.start_cache_service( self.cfg.local_device_ids, ipc_signal_suffix_cache, ) - console_logger.info( f"Worker processes(rank {local_rank}) are launched with {time.time() - start_time} seconds." )