[Bug fix] Fix zmq core bug (#3357)

* [Bug fix] Fix zmq core bug due to concurrently used by threads

* Fix zmq core bug due to concurrently used by threads
This commit is contained in:
chenjian
2025-08-13 20:24:39 +08:00
committed by GitHub
parent 7573802a88
commit 89177d881c
3 changed files with 20 additions and 17 deletions

View File

@@ -223,14 +223,12 @@ class ZmqTcpServer(ZmqServerBase):
Recieve control command from client
"""
self._ensure_socket()
while self.running:
try:
client, _, task_data = self.socket.recv_multipart(flags=zmq.NOBLOCK)
task = msgpack.unpackb(task_data)
task_id_str = task["task_id"]
except zmq.Again:
time.sleep(0.001)
continue
return None
with self.mutex:
self.req_dict[task_id_str] = client
return task
@@ -251,7 +249,7 @@ class ZmqTcpServer(ZmqServerBase):
with self.mutex:
self.req_dict.pop(task_id, None)
llm_logger.info(f"response control cmd finished, task_id: {task_id}")
llm_logger.debug(f"response control cmd finished, task_id: {task_id}")
def close(self):
"""

View File

@@ -525,6 +525,7 @@ class TokenProcessor:
for token_id in token_ids:
self.tokens_counter[task_id] += 1
if token_id != RECOVERY_STOP_SIGNAL:
if not (envs.FD_ENABLE_INTERNAL_ADAPTER and token_id in task.eos_token_ids):
result.outputs.token_ids.append(token_id)
task.output_token_ids.append(token_id)
if token_id in task.eos_token_ids or is_prefill or recovery_stop:

View File

@@ -34,6 +34,7 @@ class InternalAdapter:
self.engine = engine
self.dp_rank = dp_rank
recv_control_cmd_ports = envs.FD_ZMQ_CONTROL_CMD_SERVER_PORTS.split(",")
self.response_lock = threading.Lock() # prevent to call send_multipart in zmq concurrently
self.recv_control_cmd_server = ZmqTcpServer(port=recv_control_cmd_ports[dp_rank], mode=zmq.ROUTER)
self.recv_external_instruct_thread = threading.Thread(
target=self._recv_external_module_control_instruct, daemon=True
@@ -43,7 +44,6 @@ class InternalAdapter:
target=self._response_external_module_control_instruct, daemon=True
)
self.response_external_instruct_thread.start()
self.response_lock = threading.Lock() # prevent to call send_multipart in zmq concurrently
def _get_current_server_info(self):
"""
@@ -71,13 +71,17 @@ class InternalAdapter:
"""
while True:
try:
with self.response_lock:
task = self.recv_control_cmd_server.recv_control_cmd()
if task is None:
time.sleep(0.001)
continue
logger.info(f"Recieve control task: {task}")
task_id_str = task["task_id"]
if task["cmd"] == "get_payload":
payload_info = self._get_current_server_info()
result = {"task_id": task_id_str, "result": payload_info}
logger.info(f"Response for task: {task_id_str}")
logger.debug(f"Response for task: {task_id_str}")
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
@@ -87,7 +91,7 @@ class InternalAdapter:
extra_register_func=lambda reg: main_process_metrics.register_all(reg, workers=1),
)
result = {"task_id": task_id_str, "result": metrics_text}
logger.info(f"Response for task: {task_id_str}")
logger.debug(f"Response for task: {task_id_str}")
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
elif task["cmd"] == "connect_rdma":