mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-06 00:57:33 +08:00
[LLM] fix multinode bugs (#2945)
* [LLM] fix multinode bugs * [LLM] fix multinode bugs * [LLM] fix multinode bugs * [LLM] fix ci bugs * fix ci bugs * fix ci bugs
This commit is contained in:
@@ -124,19 +124,9 @@ class EngineArgs:
|
|||||||
Ratio of tokens to process in a block.
|
Ratio of tokens to process in a block.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
dist_init_ip: Optional[str] = None
|
ips: Optional[List[str]] = None
|
||||||
"""
|
"""
|
||||||
The master node ip of multinode deployment
|
The ips of multinode deployment
|
||||||
"""
|
|
||||||
|
|
||||||
nnodes: int = 1
|
|
||||||
"""
|
|
||||||
The number of nodes in multinode deployment
|
|
||||||
"""
|
|
||||||
|
|
||||||
node_rank: int = 0
|
|
||||||
"""
|
|
||||||
The rank of the current node in multinode deployment
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
swap_space: float = None
|
swap_space: float = None
|
||||||
@@ -495,25 +485,11 @@ class EngineArgs:
|
|||||||
# Cluster system parameters group
|
# Cluster system parameters group
|
||||||
system_group = parser.add_argument_group("System Configuration")
|
system_group = parser.add_argument_group("System Configuration")
|
||||||
system_group.add_argument(
|
system_group.add_argument(
|
||||||
"--dist-init-ip",
|
"--ips",
|
||||||
default=EngineArgs.dist_init_ip,
|
type=lambda s: s.split(",") if s else None,
|
||||||
|
default=EngineArgs.ips,
|
||||||
help=
|
help=
|
||||||
"IP addresses of master node.")
|
"IP addresses of all nodes participating in distributed inference.")
|
||||||
|
|
||||||
system_group.add_argument(
|
|
||||||
"--nnodes",
|
|
||||||
type=int,
|
|
||||||
default=EngineArgs.nnodes,
|
|
||||||
help=
|
|
||||||
"The number of all nodes.")
|
|
||||||
|
|
||||||
system_group.add_argument(
|
|
||||||
"--node-rank",
|
|
||||||
type=int,
|
|
||||||
default=EngineArgs.node_rank,
|
|
||||||
help=
|
|
||||||
"node rank id (range [0, nnodes)).")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Performance tuning parameters group
|
# Performance tuning parameters group
|
||||||
@@ -813,9 +789,7 @@ class EngineArgs:
|
|||||||
max_num_seqs=self.max_num_seqs,
|
max_num_seqs=self.max_num_seqs,
|
||||||
speculative_config=speculative_cfg,
|
speculative_config=speculative_cfg,
|
||||||
max_num_batched_tokens=self.max_num_batched_tokens,
|
max_num_batched_tokens=self.max_num_batched_tokens,
|
||||||
dist_init_ip=self.dist_init_ip,
|
ips=self.ips,
|
||||||
nnodes=self.nnodes,
|
|
||||||
node_rank=self.node_rank,
|
|
||||||
use_warmup=self.use_warmup,
|
use_warmup=self.use_warmup,
|
||||||
engine_worker_queue_port=self.engine_worker_queue_port,
|
engine_worker_queue_port=self.engine_worker_queue_port,
|
||||||
limit_mm_per_prompt=self.limit_mm_per_prompt,
|
limit_mm_per_prompt=self.limit_mm_per_prompt,
|
||||||
|
@@ -6,7 +6,6 @@
|
|||||||
# You may obtain a copy of the License at
|
# You may obtain a copy of the License at
|
||||||
#
|
#
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
#dist_init_ip
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@@ -24,7 +23,7 @@ from fastdeploy import envs
|
|||||||
from fastdeploy.platforms import current_platform
|
from fastdeploy.platforms import current_platform
|
||||||
from fastdeploy.scheduler import SchedulerConfig
|
from fastdeploy.scheduler import SchedulerConfig
|
||||||
from fastdeploy.utils import (ceil_div, check_unified_ckpt, get_host_ip,
|
from fastdeploy.utils import (ceil_div, check_unified_ckpt, get_host_ip,
|
||||||
is_port_available, get_random_port, llm_logger)
|
is_port_available, llm_logger)
|
||||||
|
|
||||||
TaskOption = Literal["generate"]
|
TaskOption = Literal["generate"]
|
||||||
|
|
||||||
@@ -642,9 +641,7 @@ class Config:
|
|||||||
max_model_len: int = 8192,
|
max_model_len: int = 8192,
|
||||||
max_num_seqs: int = 8,
|
max_num_seqs: int = 8,
|
||||||
max_num_batched_tokens: Optional[int] = None,
|
max_num_batched_tokens: Optional[int] = None,
|
||||||
dist_init_ip: str = None,
|
ips: str = None,
|
||||||
nnodes: int = 1,
|
|
||||||
node_rank: int = 0,
|
|
||||||
speculative_config: Optional[Dict[str, Any]] = None,
|
speculative_config: Optional[Dict[str, Any]] = None,
|
||||||
graph_optimization_config: Optional[Dict[str, Any]] = None,
|
graph_optimization_config: Optional[Dict[str, Any]] = None,
|
||||||
use_warmup: bool = False,
|
use_warmup: bool = False,
|
||||||
@@ -700,15 +697,29 @@ class Config:
|
|||||||
self.tokenizer = tokenizer
|
self.tokenizer = tokenizer
|
||||||
self.max_num_batched_tokens = max_num_batched_tokens
|
self.max_num_batched_tokens = max_num_batched_tokens
|
||||||
self.tensor_parallel_size = tensor_parallel_size
|
self.tensor_parallel_size = tensor_parallel_size
|
||||||
self.dist_init_ip = dist_init_ip
|
self.ips = ips
|
||||||
|
|
||||||
self.nnode = nnodes
|
|
||||||
self.node_rank = node_rank
|
if self.ips is None:
|
||||||
if self.dist_init_ip is None:
|
|
||||||
self.master_ip = "0.0.0.0"
|
self.master_ip = "0.0.0.0"
|
||||||
|
elif isinstance(self.ips, list):
|
||||||
|
self.master_ip = self.ips[0]
|
||||||
else:
|
else:
|
||||||
self.master_ip = self.dist_init_ip
|
self.ips = self.ips.split(",")
|
||||||
self.dist_init_addr = f"{self.dist_init_ip}:{get_random_port()}"
|
self.master_ip = self.ips[0]
|
||||||
|
|
||||||
|
if self.ips is None:
|
||||||
|
self.nnode = 1
|
||||||
|
self.node_rank = 0
|
||||||
|
else:
|
||||||
|
self.nnode = len(self.ips)
|
||||||
|
|
||||||
|
for idx, ip in enumerate(self.ips):
|
||||||
|
if ip == self.master_ip:
|
||||||
|
self.node_rank = idx
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
self.max_model_len = max_model_len
|
self.max_model_len = max_model_len
|
||||||
self.max_num_seqs = max_num_seqs
|
self.max_num_seqs = max_num_seqs
|
||||||
@@ -775,14 +786,11 @@ class Config:
|
|||||||
assert self.device_ids.split(',').__len__() == self.worker_num_per_node, \
|
assert self.device_ids.split(',').__len__() == self.worker_num_per_node, \
|
||||||
f"invalid CUDA_VISIBLE_DEVICES, should be equal to {self.worker_num_per_node}"
|
f"invalid CUDA_VISIBLE_DEVICES, should be equal to {self.worker_num_per_node}"
|
||||||
|
|
||||||
assert self.worker_num_per_node % self.tensor_parallel_size == 0, \
|
self.local_device_ids = self.device_ids.split(",")[: self.tensor_parallel_size]
|
||||||
f"tensor_parallel_size: {self.tensor_parallel_size} should be divisible by worker_num_per_node: {self.worker_num_per_node}"
|
|
||||||
self.local_device_ids = self.device_ids.split(
|
|
||||||
',')[:self.tensor_parallel_size]
|
|
||||||
|
|
||||||
self.host_ip = get_host_ip()
|
self.host_ip = get_host_ip()
|
||||||
|
|
||||||
if self.dist_init_ip is None or self.host_ip == self.master_ip:
|
if self.ips is None or self.host_ip == self.master_ip:
|
||||||
self.is_master = True
|
self.is_master = True
|
||||||
else:
|
else:
|
||||||
self.is_master = False
|
self.is_master = False
|
||||||
@@ -821,9 +829,6 @@ class Config:
|
|||||||
assert (
|
assert (
|
||||||
is_port_available('0.0.0.0', self.engine_worker_queue_port)
|
is_port_available('0.0.0.0', self.engine_worker_queue_port)
|
||||||
), f"The parameter `engine_worker_queue_port`:{self.engine_worker_queue_port} is already in use."
|
), f"The parameter `engine_worker_queue_port`:{self.engine_worker_queue_port} is already in use."
|
||||||
assert (
|
|
||||||
self.max_chips_per_node >= self.tensor_parallel_size > 0
|
|
||||||
), f"tensor_parallel_size: {self.tensor_parallel_size} should be between 1 and {self.max_chips_per_node}"
|
|
||||||
assert (self.nnode >= 1), f"nnode: {self.nnode} should no less than 1"
|
assert (self.nnode >= 1), f"nnode: {self.nnode} should no less than 1"
|
||||||
assert (
|
assert (
|
||||||
self.max_model_len >= 16
|
self.max_model_len >= 16
|
||||||
|
@@ -879,7 +879,7 @@ class LLMEngine(object):
|
|||||||
create=True)
|
create=True)
|
||||||
|
|
||||||
if self.do_profile:
|
if self.do_profile:
|
||||||
get_profile_block_num = np.zeros([self.cfg.worker_num_per_node], dtype=np.int32)
|
get_profile_block_num = np.zeros([min(self.cfg.tensor_parallel_size, self.cfg.worker_num_per_node)], dtype=np.int32)
|
||||||
self.get_profile_block_num_signal = IPCSignal(
|
self.get_profile_block_num_signal = IPCSignal(
|
||||||
name="get_profile_block_num",
|
name="get_profile_block_num",
|
||||||
array=get_profile_block_num,
|
array=get_profile_block_num,
|
||||||
@@ -937,10 +937,7 @@ class LLMEngine(object):
|
|||||||
配置环境变量
|
配置环境变量
|
||||||
"""
|
"""
|
||||||
variables = {
|
variables = {
|
||||||
"PADDLE_TRAINER_ID": 0,
|
|
||||||
"PADDLE_TRAINERS_NUM": 1,
|
|
||||||
"TRAINER_INSTANCES_NUM": 1,
|
|
||||||
"TRAINER_INSTANCES": "0.0.0.0",
|
|
||||||
"ENABLE_FASTDEPLOY_LOAD_MODEL_CONCURRENCY": 0,
|
"ENABLE_FASTDEPLOY_LOAD_MODEL_CONCURRENCY": 0,
|
||||||
"LOAD_STATE_DICT_THREAD_NUM": len(self.cfg.device_ids.split(',')),
|
"LOAD_STATE_DICT_THREAD_NUM": len(self.cfg.device_ids.split(',')),
|
||||||
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": "python",
|
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": "python",
|
||||||
@@ -1056,11 +1053,7 @@ class LLMEngine(object):
|
|||||||
if value:
|
if value:
|
||||||
arguments = arguments + f" --{worker_flag}"
|
arguments = arguments + f" --{worker_flag}"
|
||||||
if self.cfg.nnode > 1:
|
if self.cfg.nnode > 1:
|
||||||
pd_cmd = pd_cmd + (
|
pd_cmd = pd_cmd + f" --ips {','.join(self.cfg.ips)} --nnodes {len(self.cfg.ips)}"
|
||||||
f" --master {self.cfg.dist_init_addr}"
|
|
||||||
f" --nnodes {str(self.cfg.nnode)}"
|
|
||||||
f" --rank {str(self.cfg.node_rank)}"
|
|
||||||
)
|
|
||||||
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
|
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
|
||||||
llm_logger.info("Launch worker service command: {}".format(pd_cmd))
|
llm_logger.info("Launch worker service command: {}".format(pd_cmd))
|
||||||
p = subprocess.Popen(
|
p = subprocess.Popen(
|
||||||
@@ -1144,7 +1137,7 @@ class LLMEngine(object):
|
|||||||
"""
|
"""
|
||||||
self.do_profile = 0
|
self.do_profile = 0
|
||||||
num_gpu_blocks = -1
|
num_gpu_blocks = -1
|
||||||
for i in range(self.cfg.tensor_parallel_size):
|
for i in range(min(self.cfg.tensor_parallel_size, self.cfg.worker_num_per_node)):
|
||||||
while self.get_profile_block_num_signal.value[i] == 0:
|
while self.get_profile_block_num_signal.value[i] == 0:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
if num_gpu_blocks < 0:
|
if num_gpu_blocks < 0:
|
||||||
|
@@ -24,6 +24,7 @@ from fastdeploy.input.preprocess import InputPreprocessor
|
|||||||
from fastdeploy.engine.request import Request
|
from fastdeploy.engine.request import Request
|
||||||
from fastdeploy.inter_communicator import ZmqClient, IPCSignal
|
from fastdeploy.inter_communicator import ZmqClient, IPCSignal
|
||||||
from fastdeploy.metrics.work_metrics import work_process_metrics
|
from fastdeploy.metrics.work_metrics import work_process_metrics
|
||||||
|
from fastdeploy.platforms import current_platform
|
||||||
from fastdeploy.utils import api_server_logger, EngineError
|
from fastdeploy.utils import api_server_logger, EngineError
|
||||||
|
|
||||||
|
|
||||||
@@ -43,7 +44,8 @@ class EngineClient:
|
|||||||
self.reasoning_parser = reasoning_parser
|
self.reasoning_parser = reasoning_parser
|
||||||
self.data_processor = input_processor.create_processor()
|
self.data_processor = input_processor.create_processor()
|
||||||
self.max_model_len = max_model_len
|
self.max_model_len = max_model_len
|
||||||
self.worker_healthy_live_recorded_time_array = np.zeros(shape=[tensor_parallel_size], dtype=np.int32)
|
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
|
||||||
|
self.worker_healthy_live_recorded_time_array = np.zeros(shape=[tensor_parallel_size % max_chips_per_node], dtype=np.int32)
|
||||||
self.worker_healthy_live_signal = IPCSignal(name="worker_healthy_live_signal",
|
self.worker_healthy_live_signal = IPCSignal(name="worker_healthy_live_signal",
|
||||||
array=self.worker_healthy_live_recorded_time_array,
|
array=self.worker_healthy_live_recorded_time_array,
|
||||||
dtype=np.int32,
|
dtype=np.int32,
|
||||||
|
@@ -122,8 +122,8 @@ async def lifespan(app: FastAPI):
|
|||||||
args.mm_processor_kwargs, args.enable_mm,
|
args.mm_processor_kwargs, args.enable_mm,
|
||||||
args.reasoning_parser)
|
args.reasoning_parser)
|
||||||
app.state.dynamic_load_weight = args.dynamic_load_weight
|
app.state.dynamic_load_weight = args.dynamic_load_weight
|
||||||
chat_handler = OpenAIServingChat(engine_client, pid, args.dist_init_ip)
|
chat_handler = OpenAIServingChat(engine_client, pid, args.ips)
|
||||||
completion_handler = OpenAIServingCompletion(engine_client, pid, args.dist_init_ip)
|
completion_handler = OpenAIServingCompletion(engine_client, pid, args.ips)
|
||||||
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
|
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
|
||||||
engine_client.pid = pid
|
engine_client.pid = pid
|
||||||
app.state.engine_client = engine_client
|
app.state.engine_client = engine_client
|
||||||
|
@@ -40,15 +40,19 @@ class OpenAIServingChat:
|
|||||||
OpenAI-style chat completions serving
|
OpenAI-style chat completions serving
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, engine_client, pid, dist_init_ip):
|
def __init__(self, engine_client, pid, ips):
|
||||||
self.engine_client = engine_client
|
self.engine_client = engine_client
|
||||||
self.pid = pid
|
self.pid = pid
|
||||||
self.master_ip = dist_init_ip
|
self.master_ip = ips
|
||||||
self.host_ip = get_host_ip()
|
self.host_ip = get_host_ip()
|
||||||
|
|
||||||
def _check_master(self):
|
def _check_master(self):
|
||||||
if self.master_ip is None:
|
if self.master_ip is None:
|
||||||
return True
|
return True
|
||||||
|
if isinstance(self.master_ip, list):
|
||||||
|
self.master_ip = self.master_ip[0]
|
||||||
|
else:
|
||||||
|
self.master_ip = self.master_ip.split(",")[0]
|
||||||
if self.host_ip == self.master_ip:
|
if self.host_ip == self.master_ip:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
@@ -45,15 +45,19 @@ from fastdeploy.engine.request import RequestOutput
|
|||||||
|
|
||||||
|
|
||||||
class OpenAIServingCompletion:
|
class OpenAIServingCompletion:
|
||||||
def __init__(self, engine_client, pid, dist_init_ip):
|
def __init__(self, engine_client, pid, ips):
|
||||||
self.engine_client = engine_client
|
self.engine_client = engine_client
|
||||||
self.pid = pid
|
self.pid = pid
|
||||||
self.master_ip = dist_init_ip
|
self.master_ip = ips
|
||||||
self.host_ip = get_host_ip()
|
self.host_ip = get_host_ip()
|
||||||
|
|
||||||
def _check_master(self):
|
def _check_master(self):
|
||||||
if self.master_ip is None:
|
if self.master_ip is None:
|
||||||
return True
|
return True
|
||||||
|
if isinstance(self.master_ip, list):
|
||||||
|
self.master_ip = self.master_ip[0]
|
||||||
|
else:
|
||||||
|
self.master_ip = self.master_ip.split(",")[0]
|
||||||
if self.host_ip == self.master_ip:
|
if self.host_ip == self.master_ip:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
@@ -100,16 +100,17 @@ class GpuWorker(WorkerBase):
|
|||||||
# 1. Record memory state before profile run
|
# 1. Record memory state before profile run
|
||||||
start_time = time.perf_counter()
|
start_time = time.perf_counter()
|
||||||
Gb = 1024**3
|
Gb = 1024**3
|
||||||
paddle.device.cuda.reset_max_memory_reserved(self.local_rank)
|
local_rank = self.local_rank % self.max_chips_per_node
|
||||||
paddle.device.cuda.reset_max_memory_allocated(self.local_rank)
|
paddle.device.cuda.reset_max_memory_reserved(local_rank)
|
||||||
|
paddle.device.cuda.reset_max_memory_allocated(local_rank)
|
||||||
paddle_reserved_mem_before_run = paddle.device.cuda.max_memory_reserved(
|
paddle_reserved_mem_before_run = paddle.device.cuda.max_memory_reserved(
|
||||||
self.local_rank)
|
local_rank)
|
||||||
paddle_allocated_mem_before_run = paddle.device.cuda.max_memory_allocated(
|
paddle_allocated_mem_before_run = paddle.device.cuda.max_memory_allocated(
|
||||||
self.local_rank) # not reserved
|
local_rank) # not reserved
|
||||||
|
|
||||||
pynvml.nvmlInit()
|
pynvml.nvmlInit()
|
||||||
handle = pynvml.nvmlDeviceGetHandleByIndex(
|
handle = pynvml.nvmlDeviceGetHandleByIndex(
|
||||||
int(self.device_ids[self.local_rank]))
|
int(self.device_ids[local_rank]))
|
||||||
before_run_meminfo = pynvml.nvmlDeviceGetMemoryInfo(handle)
|
before_run_meminfo = pynvml.nvmlDeviceGetMemoryInfo(handle)
|
||||||
|
|
||||||
logger.info((
|
logger.info((
|
||||||
@@ -126,9 +127,9 @@ class GpuWorker(WorkerBase):
|
|||||||
|
|
||||||
# 3. Statistical memory information
|
# 3. Statistical memory information
|
||||||
paddle_reserved_mem_after_run = paddle.device.cuda.max_memory_reserved(
|
paddle_reserved_mem_after_run = paddle.device.cuda.max_memory_reserved(
|
||||||
self.local_rank)
|
local_rank)
|
||||||
paddle_allocated_mem_after_run = paddle.device.cuda.max_memory_allocated(
|
paddle_allocated_mem_after_run = paddle.device.cuda.max_memory_allocated(
|
||||||
self.local_rank)
|
local_rank)
|
||||||
|
|
||||||
model_block_memory_used = self.cal_theortical_kvcache()
|
model_block_memory_used = self.cal_theortical_kvcache()
|
||||||
paddle_peak_increase = paddle_reserved_mem_after_run - paddle_allocated_mem_before_run
|
paddle_peak_increase = paddle_reserved_mem_after_run - paddle_allocated_mem_before_run
|
||||||
|
@@ -143,7 +143,7 @@ class PaddleDisWorkerProc():
|
|||||||
# Initialize task queue
|
# Initialize task queue
|
||||||
task_address = (self.parallel_config.pod_ip,
|
task_address = (self.parallel_config.pod_ip,
|
||||||
self.parallel_config.engine_worker_queue_port)
|
self.parallel_config.engine_worker_queue_port)
|
||||||
|
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
|
||||||
self.task_queue = TaskQueue(
|
self.task_queue = TaskQueue(
|
||||||
address=task_address,
|
address=task_address,
|
||||||
is_server=False,
|
is_server=False,
|
||||||
@@ -162,7 +162,6 @@ class PaddleDisWorkerProc():
|
|||||||
model_weights_status:
|
model_weights_status:
|
||||||
"""
|
"""
|
||||||
# init worker_ready_signal
|
# init worker_ready_signal
|
||||||
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
|
|
||||||
array_size = min(
|
array_size = min(
|
||||||
self.max_chips_per_node, self.parallel_config.tensor_parallel_size *
|
self.max_chips_per_node, self.parallel_config.tensor_parallel_size *
|
||||||
self.parallel_config.expert_parallel_size)
|
self.parallel_config.expert_parallel_size)
|
||||||
@@ -183,9 +182,9 @@ class PaddleDisWorkerProc():
|
|||||||
array=workers_alive,
|
array=workers_alive,
|
||||||
dtype=np.int32,
|
dtype=np.int32,
|
||||||
suffix=self.parallel_config.engine_pid,
|
suffix=self.parallel_config.engine_pid,
|
||||||
create=False)
|
create=False,
|
||||||
self.worker_healthy_live_signal.value[self.local_rank % 8] = int(
|
)
|
||||||
time.time())
|
self.worker_healthy_live_signal.value[self.local_rank % self.max_chips_per_node] = int(time.time())
|
||||||
|
|
||||||
# init model_weights_status
|
# init model_weights_status
|
||||||
workers_model_weights = np.zeros(shape=[1], dtype=np.int32)
|
workers_model_weights = np.zeros(shape=[1], dtype=np.int32)
|
||||||
@@ -271,8 +270,7 @@ class PaddleDisWorkerProc():
|
|||||||
paddle.distributed.barrier()
|
paddle.distributed.barrier()
|
||||||
|
|
||||||
self.insert_step = False
|
self.insert_step = False
|
||||||
self.worker_healthy_live_signal.value[self.local_rank] = int(
|
self.worker_healthy_live_signal.value[self.local_rank % self.max_chips_per_node] = int(time.time())
|
||||||
time.time())
|
|
||||||
|
|
||||||
# The first worker detects whether there are tasks in the task queue
|
# The first worker detects whether there are tasks in the task queue
|
||||||
if self.local_rank % mp_num_per_node == 0:
|
if self.local_rank % mp_num_per_node == 0:
|
||||||
@@ -388,7 +386,7 @@ class PaddleDisWorkerProc():
|
|||||||
suffix=self.parallel_config.engine_pid,
|
suffix=self.parallel_config.engine_pid,
|
||||||
create=False)
|
create=False)
|
||||||
self.get_profile_block_num_signal.value[
|
self.get_profile_block_num_signal.value[
|
||||||
self.local_rank] = num_blocks_local
|
self.local_rank % self.max_chips_per_node] = num_blocks_local
|
||||||
|
|
||||||
# Wait all worker send the signal
|
# Wait all worker send the signal
|
||||||
while np.any(self.get_profile_block_num_signal.value <= 0):
|
while np.any(self.get_profile_block_num_signal.value <= 0):
|
||||||
@@ -396,7 +394,7 @@ class PaddleDisWorkerProc():
|
|||||||
num_blocks_global = self.get_profile_block_num_signal.value.min(
|
num_blocks_global = self.get_profile_block_num_signal.value.min(
|
||||||
).item()
|
).item()
|
||||||
self.get_profile_block_num_signal.value[
|
self.get_profile_block_num_signal.value[
|
||||||
self.local_rank] = num_blocks_global
|
self.local_rank % self.max_chips_per_node] = num_blocks_global
|
||||||
else:
|
else:
|
||||||
num_blocks_global = self.fd_config.parallel_config.total_block_num
|
num_blocks_global = self.fd_config.parallel_config.total_block_num
|
||||||
# NOTE(liuzichang): Too big num_blocks_global will lead to error 700
|
# NOTE(liuzichang): Too big num_blocks_global will lead to error 700
|
||||||
|
Reference in New Issue
Block a user