diff --git a/fastdeploy/inter_communicator/zmq_server.py b/fastdeploy/inter_communicator/zmq_server.py index ebdf311b8..4efd2c9c8 100644 --- a/fastdeploy/inter_communicator/zmq_server.py +++ b/fastdeploy/inter_communicator/zmq_server.py @@ -97,8 +97,17 @@ class ZmqServerBase(ABC): with self.response_token_lock: client, _, request_id = self.socket.recv_multipart(flags=zmq.NOBLOCK) req_id_str = request_id.decode("utf-8") + need_send_after_finished_inference = False with self.mutex: self.req_dict[req_id_str] = client + if req_id_str in self.cached_results: + if self.cached_results[req_id_str][-1][-1].finished: + need_send_after_finished_inference = True + if need_send_after_finished_inference: + self.send_response(req_id_str, []) + llm_logger.info(f"send_multipart finished, req_id: {req_id_str}") + self.req_dict.pop(req_id_str, None) + except zmq.Again: time.sleep(0.001) continue @@ -144,15 +153,12 @@ class ZmqServerBase(ABC): except Exception as e: llm_logger.error(f"Send result to zmq client failed: {e}") - if data[-1].finished: + if data and data[-1].finished: with self.mutex: - if req_id not in self.req_dict: - llm_logger.warning(f"req_id {req_id} finished but no result handle, drop it") - if req_id in self.cached_results: - del self.cached_results[req_id] - else: + if req_id in self.req_dict: llm_logger.info(f"send_multipart finished, req_id: {req_id}") - self.req_dict.pop(req_id, None) + self.req_dict.pop(req_id, None) + @abstractmethod def close(self):