diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 8439cabc9..91661f297 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -212,7 +212,7 @@ class LLMEngine: ) self.worker_proc = self._start_worker_service() - console_logger.info("Waitting worker processes ready...") + console_logger.info("Waiting for worker processes to be ready...") time.sleep(5) self.worker_init_status = dict() diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 13242f2a2..ac9ba4bfe 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -18,6 +18,8 @@ from abc import ABC, abstractmethod import zmq +from fastdeploy.utils import llm_logger + class ZmqClientBase(ABC): """ @@ -72,6 +74,10 @@ class ZmqClientBase(ABC): self._ensure_socket() return self.socket.recv_pyobj() + @abstractmethod + def close(self): + pass + class ZmqIpcClient(ZmqClientBase): def __init__(self, name, mode): @@ -89,3 +95,19 @@ class ZmqIpcClient(ZmqClientBase): def connect(self): self._ensure_socket() self.socket.connect(f"ipc://{self.file_name}") + + def close(self): + """ + Close the socket and context. + """ + llm_logger.info("ZMQ client is closing connection...") + try: + if self.socket is not None and not self.socket.closed: + self.socket.setsockopt(zmq.LINGER, 0) + self.socket.close() + if self.context is not None: + self.context.term() + + except Exception as e: + llm_logger.warning(f"ZMQ client failed to close connection - {e}") + return diff --git a/fastdeploy/inter_communicator/zmq_server.py b/fastdeploy/inter_communicator/zmq_server.py index 56488d53a..ebdf311b8 100644 --- a/fastdeploy/inter_communicator/zmq_server.py +++ b/fastdeploy/inter_communicator/zmq_server.py @@ -210,7 +210,7 @@ class ZmqIpcServer(ZmqServerBase): return self.running = False - llm_logger.info("Closing ZMQ connection...") + llm_logger.info("ZMQ server is closing connection...") try: if self.socket is not None and not self.socket.closed: self.socket.close() @@ -218,7 +218,7 @@ class ZmqIpcServer(ZmqServerBase): self.context.term() self._clear_ipc(self.file_name) except Exception as e: - llm_logger.warning(f"Failed to close ZMQ connection - {e}") + llm_logger.warning(f"ZMQ server failed to close connection - {e}") return @@ -290,7 +290,7 @@ class ZmqTcpServer(ZmqServerBase): return self.running = False - llm_logger.info("Closing ZMQ connection...") + llm_logger.info("ZMQ server is closing connection...") try: if self.socket is not None and not self.socket.closed: self.socket.close() @@ -298,5 +298,5 @@ class ZmqTcpServer(ZmqServerBase): self.context.term() except Exception as e: - llm_logger.warning(f"Failed to close ZMQ connection - {e}") + llm_logger.warning(f"ZMQ server failed to close connection - {e}") return