diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index a6b611516..4f45413ff 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -68,11 +68,11 @@ class DynamicWeightManager: paddle.device.cuda.empty_cache() # step1 : restart paddle process group - if not self.first_load: - paddle.distributed.restart_process_group() - paddle.distributed.restart_process_group(self.parallel_config.tp_group) - if self.parallel_config.enable_expert_parallel: - paddle.distributed.restart_process_group(self.parallel_config.ep_group) + # if not self.first_load: + # paddle.distributed.restart_process_group() + # paddle.distributed.restart_process_group(self.parallel_config.tp_group) + # if self.parallel_config.enable_expert_parallel: + # paddle.distributed.restart_process_group(self.parallel_config.ep_group) # step2 : recreat deepep buffer when enable expert parallel if self.parallel_config.enable_expert_parallel and not self.first_load: @@ -136,7 +136,7 @@ class DynamicWeightManager: # ep barrier paddle.distributed.barrier(self.parallel_config.ep_group) # shutdown ep group - paddle.distributed.shutdown_process_group(self.parallel_config.ep_group) + # paddle.distributed.shutdown_process_group(self.parallel_config.ep_group) paddle.device.cuda.empty_cache() # step2: release model weight @@ -149,11 +149,11 @@ class DynamicWeightManager: if self.parallel_config.tensor_parallel_size > 1: # tp barrier paddle.distributed.barrier(self.parallel_config.tp_group) - paddle.distributed.shutdown_process_group(self.parallel_config.tp_group) + # paddle.distributed.shutdown_process_group(self.parallel_config.tp_group) if self.parallel_config.enable_expert_parallel: paddle.distributed.barrier(self.parallel_config.ep_group) - paddle.distributed.shutdown_process_group(self.parallel_config.ep_group) - paddle.distributed.shutdown_process_group() + # paddle.distributed.shutdown_process_group(self.parallel_config.ep_group) + # paddle.distributed.shutdown_process_group() self._update_shared_status(pid, ModelWeightsStatus.CLEARED) def _update_model_from_state(self, state_dict: Dict[str, paddle.Tensor], src_type: str): @@ -257,7 +257,7 @@ class DynamicWeightManager: """ check model weights status """ - logger.info(f"dynamic weight manager is check model weights status! {model_weights_status.value[0]}") + # logger.info(f"dynamic weight manager is check model weights status! {model_weights_status.value[0]}") while ( model_weights_status.value[0] != ModelWeightsStatus.NORMAL and model_weights_status.value[0] != ModelWeightsStatus.CLEARED diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 0c29ce4d7..4d87f3c1f 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -459,7 +459,7 @@ class PaddleDisWorkerProc: else: paddle.distributed.barrier(self.parallel_config.tp_group) if self.model_weights_signal[0] != ModelWeightsStatus.NORMAL: - logger.info( + logger.debug( f"Rank: {self.local_rank} to update or clear parameters, signal is {self.model_weights_signal[0]}, [-1:clear, 1:update]" ) from fastdeploy.rl.dynamic_weight_manager import ( @@ -473,10 +473,10 @@ class PaddleDisWorkerProc: self.worker.model_runner, self.parallel_config.engine_worker_queue_port, ) - logger.info(f"current task queue data: {self.task_queue.num_tasks()}") + logger.debug(f"current task queue data: {self.task_queue.num_tasks()}") self.task_queue.clear_data() self.model_weights_signal[0] = ModelWeightsStatus.NORMAL - logger.info(f"Rank: {self.local_rank} has updated or cleared parameters.") + logger.debug(f"Rank: {self.local_rank} has updated or cleared parameters.") if self.exist_task_signal.value[0] == ExistTaskStatus.EXIST or self.task_queue.read_finish_flag.get() == 1: logger.info(f"Rank: {self.local_rank} Detected new requests.")