[FDConfig]Remove splitwise_role and engine_worker_queue_port in FDConfig (#4147)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled

* remove splitwise_role and engine_worker_queue_port

* fix xpu

* fix xpu

* fix xpu

* fix unittest

* resolve conflct
This commit is contained in:
YuanRisheng
2025-09-19 17:01:52 +08:00
committed by GitHub
parent ee9d8a840a
commit 24180fba0a
23 changed files with 129 additions and 89 deletions

View File

@@ -76,10 +76,10 @@ class EngineService:
cfg.scheduler_config.max_num_seqs,
cfg,
cfg.parallel_config.tensor_parallel_size,
cfg.splitwise_role,
cfg.scheduler_config.splitwise_role,
cfg.parallel_config.local_data_parallel_id,
)
if cfg.splitwise_role != "mixed":
if cfg.scheduler_config.splitwise_role != "mixed":
raise NotImplementedError(
"Currently ENABLE_V1_KVCACHE_SCHEDULER=1 only supported in mixed sampling now."
)
@@ -88,13 +88,13 @@ class EngineService:
cfg.scheduler_config.max_num_seqs,
cfg,
cfg.parallel_config.tensor_parallel_size,
cfg.splitwise_role,
cfg.scheduler_config.splitwise_role,
cfg.parallel_config.local_data_parallel_id,
)
self.start_worker_queue_service(start_queue)
os.environ["INFERENCE_MSG_QUEUE_ID"] = self.cfg.engine_worker_queue_port[
os.environ["INFERENCE_MSG_QUEUE_ID"] = self.cfg.parallel_config.engine_worker_queue_port[
self.cfg.parallel_config.local_data_parallel_id
]
@@ -137,7 +137,9 @@ class EngineService:
self.token_processor.run()
def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进程感知是否有新Task需要处理
current_suffix = int(self.cfg.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id])
current_suffix = int(
self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]
)
llm_logger.info(f"current_suffix: {current_suffix}")
exist_task_signal_data = np.zeros([1], dtype=np.int32)
self.exist_task_signal = IPCSignal(
@@ -195,7 +197,7 @@ class EngineService:
"""
address = (
self.cfg.master_ip,
int(self.cfg.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]),
int(self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]),
)
if start_queue and (self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0"):
@@ -209,7 +211,7 @@ class EngineService:
if (
self.cfg.cache_config.enable_prefix_caching
or self.cfg.splitwise_role != "mixed"
or self.cfg.scheduler_config.splitwise_role != "mixed"
and self.cfg.parallel_config.local_data_parallel_id == 0
):
self.cache_task_queue = EngineCacheQueue(
@@ -253,7 +255,10 @@ class EngineService:
del self.resource_manager.req_dict[task.request_id]
cur_task = self.resource_manager.tasks_list[cur_task_idx]
cur_task.prompt_token_ids[0] = task.outputs.token_ids[0]
if self.cfg.speculative_config.method in ["mtp"] and self.cfg.splitwise_role == "decode":
if (
self.cfg.speculative_config.method in ["mtp"]
and self.cfg.scheduler_config.splitwise_role == "decode"
):
cur_task.draft_token_ids = copy.deepcopy(task.outputs.draft_token_ids)
if task.error_code != 200:
self.resource_manager.stop_flags[cur_task_idx] = True
@@ -478,7 +483,10 @@ class EngineService:
time.sleep(0.001)
continue
if hasattr(self, "exist_prefill_task_signal") and self.exist_prefill_task_signal.value[0] > 0:
if self.cfg.splitwise_role == "mixed" or self.split_connector.has_splitwise_tasks():
if (
self.cfg.scheduler_config.splitwise_role == "mixed"
or self.split_connector.has_splitwise_tasks()
):
time.sleep(0.005)
continue
if self.engine_worker_queue.num_cache_infos() > 0:
@@ -507,7 +515,7 @@ class EngineService:
continue
current_id = (current_id + 1) % 100003
if self.cfg.splitwise_role != "mixed":
if self.cfg.scheduler_config.splitwise_role != "mixed":
llm_logger.info("Inserting splitwise tasks")
self.split_connector.send_splitwise_tasks(tasks, current_id)
@@ -759,7 +767,7 @@ class EngineService:
device_ids=device_ids,
pod_ip=self.cfg.master_ip,
engine_worker_queue_port=int(
self.cfg.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]
self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]
),
pid_suffix=ipc_signal_suffix,
)