diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index dec1e8bae..56161e30c 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -124,8 +124,9 @@ class LLMEngine: cfg.max_num_seqs, cfg, cfg.tensor_parallel_size, cfg.splitwise_role ) - os.environ["INFERENCE_MSG_QUEUE_ID"] = str(self.cfg.engine_worker_queue_port) - + os.environ["INFERENCE_MSG_QUEUE_ID"] = str( + self.cfg.engine_worker_queue_port + self.cfg.worker_num_per_node * self.cfg.node_rank + ) self.split_connector = SplitwiseConnector(cfg, self.scheduler, self.engine_worker_queue, self.resource_manager) self.token_processor = TokenProcessor( diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 63b1b15be..0032780b9 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -59,8 +59,8 @@ class ExpertService: self.cfg.disaggregate_info = None self.scheduler = cfg.scheduler_config.scheduler() - - self.scheduler.reset_nodeid(f"{self.scheduler.infer.nodeid}_{local_data_parallel_id!s}") + if cfg.scheduler_config.name == "splitwise": + self.scheduler.reset_nodeid(f"{self.scheduler.infer.nodeid}_{local_data_parallel_id!s}") self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id @@ -143,11 +143,11 @@ class ExpertService: self.token_processor.run() self.cfg.init_cache_info() - - role = self.cfg.splitwise_role - host_ip = self.cfg.host_ip - disaggregate = self.cfg.disaggregate_info - self.scheduler.start(role, host_ip, disaggregate) + if self.cfg.scheduler_config.name == "splitwise": + role = self.cfg.splitwise_role + host_ip = self.cfg.host_ip + disaggregate = self.cfg.disaggregate_info + self.scheduler.start(role, host_ip, disaggregate) self.cfg.print() console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.") @@ -363,6 +363,10 @@ def start_expert_service(cfg, local_data_parallel_id, ipc_signal_suffix): expert_service = ExpertService(cfg, local_data_parallel_id) try: expert_service.start(ipc_signal_suffix, local_data_parallel_id) - expert_service.split_connector.start_receiver() + if cfg.splitwise_role != "mixed": + expert_service.split_connector.start_receiver() + else: + while True: + time.sleep(100) except Exception as e: llm_logger.exception(f"Expert service failed to start: {e}")