diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 3392b26d6..a2c984d4f 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -303,7 +303,8 @@ class EngineService: client_id=0, local_data_parallel_size=self.cfg.parallel_config.data_parallel_size, local_data_parallel_id=min( - self.cfg.worker_num_per_node * self.cfg.node_rank + self.cfg.parallel_config.local_data_parallel_id, + self.cfg.worker_num_per_node // self.cfg.parallel_config.tensor_parallel_size * self.cfg.node_rank + + self.cfg.parallel_config.local_data_parallel_id, self.cfg.parallel_config.data_parallel_size - 1, ), ) diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 6f567be09..5a0da7bc6 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -130,29 +130,28 @@ class ExpertService: create=False, ) self.launched_expert_service_signal.value[local_rank] = 1 + if self.do_profile: + get_profile_block_num = np.zeros([1], dtype=np.int32) + while True: + try: + self.get_profile_block_num_signal = IPCSignal( + name="get_profile_block_num", + array=get_profile_block_num, + dtype=np.int32, + suffix=int(self.cfg.parallel_config.engine_worker_queue_port[0]), + create=False, + ) + break + except: + time.sleep(1) + self.reset_kvcache_blocks() if self.cfg.scheduler_config.splitwise_role != "mixed" or self.cfg.cache_config.enable_prefix_caching: - if self.do_profile: - get_profile_block_num = np.zeros([1], dtype=np.int32) - while True: - try: - self.get_profile_block_num_signal = IPCSignal( - name="get_profile_block_num", - array=get_profile_block_num, - dtype=np.int32, - suffix=int(self.cfg.parallel_config.engine_worker_queue_port[0]), - create=False, - ) - break - except: - time.sleep(1) - self.reset_kvcache_blocks() ipc_signal_suffix_cache = self.cfg.parallel_config.engine_worker_queue_port[local_data_parallel_id] self.cache_manager_processes = self.engine.start_cache_service( self.cfg.local_device_ids, ipc_signal_suffix_cache, ) - console_logger.info( f"Worker processes(rank {local_rank}) are launched with {time.time() - start_time} seconds." )