[Cherry-Pick] [BugFix] [RL] remove shutdown_process_group/restart_process_group for RL (#5433) (#5434)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled

* [fix] remove shutdown_process_group/restart_process_group for RL

* [chore] remove log

* [chore] remove log

* [chore] set log to debug level
This commit is contained in:
Yonghua Li
2025-12-08 19:13:06 +08:00
committed by GitHub
parent d4c16aa63e
commit 31436a35e4
2 changed files with 13 additions and 13 deletions

View File

@@ -68,11 +68,11 @@ class DynamicWeightManager:
paddle.device.cuda.empty_cache()
# step1 : restart paddle process group
if not self.first_load:
paddle.distributed.restart_process_group()
paddle.distributed.restart_process_group(self.parallel_config.tp_group)
if self.parallel_config.enable_expert_parallel:
paddle.distributed.restart_process_group(self.parallel_config.ep_group)
# if not self.first_load:
# paddle.distributed.restart_process_group()
# paddle.distributed.restart_process_group(self.parallel_config.tp_group)
# if self.parallel_config.enable_expert_parallel:
# paddle.distributed.restart_process_group(self.parallel_config.ep_group)
# step2 : recreat deepep buffer when enable expert parallel
if self.parallel_config.enable_expert_parallel and not self.first_load:
@@ -136,7 +136,7 @@ class DynamicWeightManager:
# ep barrier
paddle.distributed.barrier(self.parallel_config.ep_group)
# shutdown ep group
paddle.distributed.shutdown_process_group(self.parallel_config.ep_group)
# paddle.distributed.shutdown_process_group(self.parallel_config.ep_group)
paddle.device.cuda.empty_cache()
# step2: release model weight
@@ -149,11 +149,11 @@ class DynamicWeightManager:
if self.parallel_config.tensor_parallel_size > 1:
# tp barrier
paddle.distributed.barrier(self.parallel_config.tp_group)
paddle.distributed.shutdown_process_group(self.parallel_config.tp_group)
# paddle.distributed.shutdown_process_group(self.parallel_config.tp_group)
if self.parallel_config.enable_expert_parallel:
paddle.distributed.barrier(self.parallel_config.ep_group)
paddle.distributed.shutdown_process_group(self.parallel_config.ep_group)
paddle.distributed.shutdown_process_group()
# paddle.distributed.shutdown_process_group(self.parallel_config.ep_group)
# paddle.distributed.shutdown_process_group()
self._update_shared_status(pid, ModelWeightsStatus.CLEARED)
def _update_model_from_state(self, state_dict: Dict[str, paddle.Tensor], src_type: str):
@@ -257,7 +257,7 @@ class DynamicWeightManager:
"""
check model weights status
"""
logger.info(f"dynamic weight manager is check model weights status! {model_weights_status.value[0]}")
# logger.info(f"dynamic weight manager is check model weights status! {model_weights_status.value[0]}")
while (
model_weights_status.value[0] != ModelWeightsStatus.NORMAL
and model_weights_status.value[0] != ModelWeightsStatus.CLEARED

View File

@@ -459,7 +459,7 @@ class PaddleDisWorkerProc:
else:
paddle.distributed.barrier(self.parallel_config.tp_group)
if self.model_weights_signal[0] != ModelWeightsStatus.NORMAL:
logger.info(
logger.debug(
f"Rank: {self.local_rank} to update or clear parameters, signal is {self.model_weights_signal[0]}, [-1:clear, 1:update]"
)
from fastdeploy.rl.dynamic_weight_manager import (
@@ -473,10 +473,10 @@ class PaddleDisWorkerProc:
self.worker.model_runner,
self.parallel_config.engine_worker_queue_port,
)
logger.info(f"current task queue data: {self.task_queue.num_tasks()}")
logger.debug(f"current task queue data: {self.task_queue.num_tasks()}")
self.task_queue.clear_data()
self.model_weights_signal[0] = ModelWeightsStatus.NORMAL
logger.info(f"Rank: {self.local_rank} has updated or cleared parameters.")
logger.debug(f"Rank: {self.local_rank} has updated or cleared parameters.")
if self.exist_task_signal.value[0] == ExistTaskStatus.EXIST or self.task_queue.read_finish_flag.get() == 1:
logger.info(f"Rank: {self.local_rank} Detected new requests.")