diff --git a/fastdeploy/splitwise/internal_adapter_utils.py b/fastdeploy/splitwise/internal_adapter_utils.py index db3ea520d..dc0ddf544 100644 --- a/fastdeploy/splitwise/internal_adapter_utils.py +++ b/fastdeploy/splitwise/internal_adapter_utils.py @@ -43,6 +43,7 @@ class InternalAdapter: target=self._response_external_module_control_instruct, daemon=True ) self.response_external_instruct_thread.start() + self.response_lock = threading.Lock() # prevent to call send_multipart in zmq concurrently def _get_current_server_info(self): """ @@ -76,7 +77,8 @@ class InternalAdapter: payload_info = self._get_current_server_info() result = {"task_id": task_id_str, "result": payload_info} logger.info(f"Response for task: {task_id_str}") - self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) + with self.response_lock: + self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) elif task["cmd"] == "get_metrics": metrics_text = get_filtered_metrics( @@ -85,7 +87,8 @@ class InternalAdapter: ) result = {"task_id": task_id_str, "result": metrics_text} logger.info(f"Response for task: {task_id_str}") - self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) + with self.response_lock: + self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) elif task["cmd"] == "connect_rdma": self.engine.engine_worker_queue.put_connect_rdma_task(task) @@ -100,7 +103,8 @@ class InternalAdapter: task_id_str = result_data["task_id"] result = {"task_id": task_id_str, "result": result_data} logger.info(f"Response for task: {task_id_str}") - self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) + with self.response_lock: + self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) else: time.sleep(0.001) except Exception as e: