Fix bug for concurrently visit zmq (#3233)

This commit is contained in:
chenjian
2025-08-06 10:41:10 +08:00
committed by GitHub
parent b20ffe3697
commit a9d231c900

View File

@@ -43,6 +43,7 @@ class InternalAdapter:
target=self._response_external_module_control_instruct, daemon=True
)
self.response_external_instruct_thread.start()
self.response_lock = threading.Lock() # prevent to call send_multipart in zmq concurrently
def _get_current_server_info(self):
"""
@@ -76,6 +77,7 @@ class InternalAdapter:
payload_info = self._get_current_server_info()
result = {"task_id": task_id_str, "result": payload_info}
logger.info(f"Response for task: {task_id_str}")
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
elif task["cmd"] == "get_metrics":
@@ -85,6 +87,7 @@ class InternalAdapter:
)
result = {"task_id": task_id_str, "result": metrics_text}
logger.info(f"Response for task: {task_id_str}")
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
elif task["cmd"] == "connect_rdma":
self.engine.engine_worker_queue.put_connect_rdma_task(task)
@@ -100,6 +103,7 @@ class InternalAdapter:
task_id_str = result_data["task_id"]
result = {"task_id": task_id_str, "result": result_data}
logger.info(f"Response for task: {task_id_str}")
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
else:
time.sleep(0.001)