mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
add check health in FD (#5534)
This commit is contained in:
@@ -152,6 +152,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
"FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")),
|
||||
"FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")),
|
||||
"FMQ_CONFIG_JSON": lambda: os.getenv("FMQ_CONFIG_JSON", None),
|
||||
"FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: int(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -121,6 +121,30 @@ class TokenProcessor:
|
||||
self.accept_token_num_per_head_per_request = {}
|
||||
self.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS
|
||||
|
||||
# health monitor
|
||||
self.timestamp_for_alive_before_handle_batch = None
|
||||
self.timestamp_for_alive_after_handle_batch = None
|
||||
self.health_lock = threading.Lock()
|
||||
self.engine_output_token_hang = False
|
||||
|
||||
def healthy(self):
|
||||
"""
|
||||
whether token processor is healthy
|
||||
"""
|
||||
with self.health_lock:
|
||||
if self.timestamp_for_alive_after_handle_batch is None: # has entered handle batch
|
||||
if (
|
||||
self.timestamp_for_alive_before_handle_batch is not None
|
||||
and time.time() - self.timestamp_for_alive_before_handle_batch
|
||||
> envs.FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT
|
||||
):
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
if self.engine_output_token_hang:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _cleanup_resources(self):
|
||||
"""Cleaning up shared memory resources"""
|
||||
if hasattr(self, "executor"):
|
||||
@@ -404,7 +428,14 @@ class TokenProcessor:
|
||||
if self.output_tokens[0, 0] == -2:
|
||||
continue
|
||||
llm_logger.debug(f"rank_id {rank_id} self.output_tokens[0, 0] {self.output_tokens[0, 0]}")
|
||||
with self.health_lock:
|
||||
self.timestamp_for_alive_before_handle_batch = time.time()
|
||||
self.timestamp_for_alive_after_handle_batch = None
|
||||
self._process_batch_output()
|
||||
with self.health_lock:
|
||||
self.timestamp_for_alive_before_handle_batch = None
|
||||
self.timestamp_for_alive_after_handle_batch = time.time()
|
||||
|
||||
except Exception as e:
|
||||
llm_logger.info(f"while get input_data error: {e} {traceback.format_exc()!s}")
|
||||
|
||||
|
||||
@@ -95,6 +95,12 @@ class InternalAdapter:
|
||||
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
|
||||
elif task["cmd"] == "connect_rdma":
|
||||
self.engine.engine_worker_queue.put_connect_rdma_task(task)
|
||||
elif task["cmd"] == "check_health":
|
||||
is_health = self.engine.token_processor.healthy()
|
||||
result = {"task_id": task_id_str, "result": is_health}
|
||||
logger.debug(f"Response for task: {task_id_str}: is_health {is_health}")
|
||||
with self.response_lock:
|
||||
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"handle_control_cmd got error: {e}, {traceback.format_exc()!s}")
|
||||
|
||||
Reference in New Issue
Block a user