mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
[worker_process.py]modify some var name (#4749)
This commit is contained in:
@@ -340,11 +340,14 @@ class PaddleDisWorkerProc:
|
||||
mmap_infos = create_mmap(
|
||||
[MODEL_MAIN_NAME], self.local_rank, self.ranks, shm_uuid=os.getenv("SHM_UUID", ""), logger=logger
|
||||
)
|
||||
|
||||
tp_size = self.parallel_config.tensor_parallel_size
|
||||
# Currently, only support single node
|
||||
self.nnode = int((self.parallel_config.tensor_parallel_size + 7) // 8)
|
||||
self.nnode = int((tp_size + 7) // 8)
|
||||
req_ids = []
|
||||
num_running_requests = 0
|
||||
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
|
||||
tp_rank = self.local_rank % tp_size
|
||||
|
||||
self.model_weights_signal = np.zeros([1], dtype=np.int32)
|
||||
while True:
|
||||
if self.eplb_config.enable_redundant_experts:
|
||||
@@ -385,35 +388,34 @@ class PaddleDisWorkerProc:
|
||||
if self.local_rank == 0:
|
||||
rearrange_experts_status_array[0] = RearrangeExpertState.done.value
|
||||
logger.info("redundant_expert: done")
|
||||
if self.local_rank % self.parallel_config.tensor_parallel_size == 0:
|
||||
if tp_rank == 0:
|
||||
if self.model_weights_status.value[0] != ModelWeightsStatus.NORMAL:
|
||||
self.model_weights_signal[0] = int(self.model_weights_status.value[0])
|
||||
if self.fd_config.load_config.dynamic_load_weight and self.parallel_config.enable_expert_parallel:
|
||||
self.model_weights_signal[0] = self._broadcast_model_weights_signal(
|
||||
src=0, group=self.parallel_config.ep_group
|
||||
)
|
||||
if self.fd_config.load_config.dynamic_load_weight and self.parallel_config.tensor_parallel_size > 1:
|
||||
if self.fd_config.load_config.dynamic_load_weight and tp_size > 1:
|
||||
self.model_weights_signal[0] = self._broadcast_model_weights_signal(
|
||||
src=0, group=self.parallel_config.tp_group
|
||||
)
|
||||
|
||||
self.insert_step = False
|
||||
req_dicts = None
|
||||
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
|
||||
self.worker_healthy_live_signal.value[local_rank % self.max_chips_per_node] = int(time.time())
|
||||
self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time())
|
||||
|
||||
# The first worker detects whether there are tasks in the task queue
|
||||
if local_rank == 0:
|
||||
if tp_rank == 0:
|
||||
if self.task_queue.num_tasks() > 0:
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
|
||||
self.fd_config.model_config.enable_mm and self.worker.exist_prefill()
|
||||
):
|
||||
if self.nnode > 1 and self.parallel_config.tensor_parallel_size > self.max_chips_per_node:
|
||||
if self.nnode > 1 and tp_size > self.max_chips_per_node:
|
||||
self.task_queue.read_finish_flag.set(1)
|
||||
else:
|
||||
self.exist_task_signal.value[0] = ExistTaskStatus.EXIST
|
||||
|
||||
if self.parallel_config.tensor_parallel_size > 1:
|
||||
if tp_size > 1:
|
||||
# Synchronize the signal for other workers
|
||||
self._tp_barrier_wait()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user