diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index df6604f8d..ddb0b355a 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -340,11 +340,14 @@ class PaddleDisWorkerProc: mmap_infos = create_mmap( [MODEL_MAIN_NAME], self.local_rank, self.ranks, shm_uuid=os.getenv("SHM_UUID", ""), logger=logger ) + + tp_size = self.parallel_config.tensor_parallel_size # Currently, only support single node - self.nnode = int((self.parallel_config.tensor_parallel_size + 7) // 8) + self.nnode = int((tp_size + 7) // 8) req_ids = [] num_running_requests = 0 - local_rank = self.local_rank % self.parallel_config.tensor_parallel_size + tp_rank = self.local_rank % tp_size + self.model_weights_signal = np.zeros([1], dtype=np.int32) while True: if self.eplb_config.enable_redundant_experts: @@ -385,35 +388,34 @@ class PaddleDisWorkerProc: if self.local_rank == 0: rearrange_experts_status_array[0] = RearrangeExpertState.done.value logger.info("redundant_expert: done") - if self.local_rank % self.parallel_config.tensor_parallel_size == 0: + if tp_rank == 0: if self.model_weights_status.value[0] != ModelWeightsStatus.NORMAL: self.model_weights_signal[0] = int(self.model_weights_status.value[0]) if self.fd_config.load_config.dynamic_load_weight and self.parallel_config.enable_expert_parallel: self.model_weights_signal[0] = self._broadcast_model_weights_signal( src=0, group=self.parallel_config.ep_group ) - if self.fd_config.load_config.dynamic_load_weight and self.parallel_config.tensor_parallel_size > 1: + if self.fd_config.load_config.dynamic_load_weight and tp_size > 1: self.model_weights_signal[0] = self._broadcast_model_weights_signal( src=0, group=self.parallel_config.tp_group ) self.insert_step = False req_dicts = None - local_rank = self.local_rank % self.parallel_config.tensor_parallel_size - self.worker_healthy_live_signal.value[local_rank % self.max_chips_per_node] = int(time.time()) + self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time()) # The first worker detects whether there are tasks in the task queue - if local_rank == 0: + if tp_rank == 0: if self.task_queue.num_tasks() > 0: if envs.ENABLE_V1_KVCACHE_SCHEDULER or not ( self.fd_config.model_config.enable_mm and self.worker.exist_prefill() ): - if self.nnode > 1 and self.parallel_config.tensor_parallel_size > self.max_chips_per_node: + if self.nnode > 1 and tp_size > self.max_chips_per_node: self.task_queue.read_finish_flag.set(1) else: self.exist_task_signal.value[0] = ExistTaskStatus.EXIST - if self.parallel_config.tensor_parallel_size > 1: + if tp_size > 1: # Synchronize the signal for other workers self._tp_barrier_wait()