From dd425b89ed6aa2637e0b8690c5fccb60ea4c3de0 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Wed, 15 Oct 2025 10:29:30 +0800 Subject: [PATCH] [BugFix] fix cache port and zmq close bugs (#4371) * Update common_engine.py * Update zmq_client.py * Update expert_service.py --------- Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> --- fastdeploy/engine/common_engine.py | 13 +++++++------ fastdeploy/engine/expert_service.py | 2 -- fastdeploy/inter_communicator/zmq_client.py | 18 ++++++++++++++++++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index a8c918e89..3169e3939 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -63,12 +63,13 @@ class EngineSevice: cfg (Config): Config object containing all the configuration parameters. """ self.cfg = cfg - if isinstance(self.cfg.cache_config.cache_queue_port, str): - self.cfg.cache_config.cache_queue_port = self.cfg.cache_config.cache_queue_port.split(",") - if isinstance(self.cfg.cache_config.cache_queue_port, list): - self.cfg.cache_config.cache_queue_port = int( - self.cfg.cache_config.cache_queue_port[self.cfg.parallel_config.local_data_parallel_id] - ) + if cfg.splitwise_role != "mixed" or cfg.cache_config.enable_prefix_caching: + if isinstance(self.cfg.cache_config.cache_queue_port, str): + self.cfg.cache_config.cache_queue_port = self.cfg.cache_config.cache_queue_port.split(",") + if isinstance(self.cfg.cache_config.cache_queue_port, list): + self.cfg.cache_config.cache_queue_port = int( + self.cfg.cache_config.cache_queue_port[self.cfg.parallel_config.local_data_parallel_id] + ) self.scheduler = cfg.scheduler_config.scheduler() if envs.ENABLE_V1_KVCACHE_SCHEDULER: diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index ce95f2e16..6c20a71b8 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -116,8 +116,6 @@ class ExpertService: self.cache_manager_processes = self.engine.start_cache_service( self.cfg.local_device_ids, ipc_signal_suffix_cache ) - if self.cfg.splitwise_role != "mixed": - self.engine.split_mode_get_tasks() if self.cfg.scheduler_config.name == "splitwise": self.cfg.init_cache_info() diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 13242f2a2..6765757e9 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -18,6 +18,8 @@ from abc import ABC, abstractmethod import zmq +from fastdeploy.utils import llm_logger + class ZmqClientBase(ABC): """ @@ -89,3 +91,19 @@ class ZmqIpcClient(ZmqClientBase): def connect(self): self._ensure_socket() self.socket.connect(f"ipc://{self.file_name}") + + def close(self): + """ + Close the socket and context. + """ + llm_logger.info("ZMQ client is closing connection...") + try: + if self.socket is not None and not self.socket.closed: + self.socket.setsockopt(zmq.LINGER, 0) + self.socket.close() + if self.context is not None: + self.context.term() + + except Exception as e: + llm_logger.warning(f"ZMQ client failed to close connection - {e}") + return