[BUGFIX] fix ep mixed bug (#3513)

* Update expert_service.py

* Update engine.py

* Update engine.py

* Update engine.py

* Update expert_service.py

* Update engine.py
This commit is contained in:
ltd0924
2025-08-22 11:35:50 +08:00
committed by GitHub
parent 4a9c04a746
commit c18975366e
2 changed files with 15 additions and 10 deletions

View File

@@ -124,8 +124,9 @@ class LLMEngine:
cfg.max_num_seqs, cfg, cfg.tensor_parallel_size, cfg.splitwise_role cfg.max_num_seqs, cfg, cfg.tensor_parallel_size, cfg.splitwise_role
) )
os.environ["INFERENCE_MSG_QUEUE_ID"] = str(self.cfg.engine_worker_queue_port) os.environ["INFERENCE_MSG_QUEUE_ID"] = str(
self.cfg.engine_worker_queue_port + self.cfg.worker_num_per_node * self.cfg.node_rank
)
self.split_connector = SplitwiseConnector(cfg, self.scheduler, self.engine_worker_queue, self.resource_manager) self.split_connector = SplitwiseConnector(cfg, self.scheduler, self.engine_worker_queue, self.resource_manager)
self.token_processor = TokenProcessor( self.token_processor = TokenProcessor(

View File

@@ -59,7 +59,7 @@ class ExpertService:
self.cfg.disaggregate_info = None self.cfg.disaggregate_info = None
self.scheduler = cfg.scheduler_config.scheduler() self.scheduler = cfg.scheduler_config.scheduler()
if cfg.scheduler_config.name == "splitwise":
self.scheduler.reset_nodeid(f"{self.scheduler.infer.nodeid}_{local_data_parallel_id!s}") self.scheduler.reset_nodeid(f"{self.scheduler.infer.nodeid}_{local_data_parallel_id!s}")
self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id
@@ -143,7 +143,7 @@ class ExpertService:
self.token_processor.run() self.token_processor.run()
self.cfg.init_cache_info() self.cfg.init_cache_info()
if self.cfg.scheduler_config.name == "splitwise":
role = self.cfg.splitwise_role role = self.cfg.splitwise_role
host_ip = self.cfg.host_ip host_ip = self.cfg.host_ip
disaggregate = self.cfg.disaggregate_info disaggregate = self.cfg.disaggregate_info
@@ -363,6 +363,10 @@ def start_expert_service(cfg, local_data_parallel_id, ipc_signal_suffix):
expert_service = ExpertService(cfg, local_data_parallel_id) expert_service = ExpertService(cfg, local_data_parallel_id)
try: try:
expert_service.start(ipc_signal_suffix, local_data_parallel_id) expert_service.start(ipc_signal_suffix, local_data_parallel_id)
if cfg.splitwise_role != "mixed":
expert_service.split_connector.start_receiver() expert_service.split_connector.start_receiver()
else:
while True:
time.sleep(100)
except Exception as e: except Exception as e:
llm_logger.exception(f"Expert service failed to start: {e}") llm_logger.exception(f"Expert service failed to start: {e}")