diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 2f54f2725..712f9195f 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -152,6 +152,7 @@ environment_variables: dict[str, Callable[[], Any]] = { "FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")), "FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")), "FMQ_CONFIG_JSON": lambda: os.getenv("FMQ_CONFIG_JSON", None), + "FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: int(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")), } diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index d7afd1947..197b7f64d 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -121,6 +121,30 @@ class TokenProcessor: self.accept_token_num_per_head_per_request = {} self.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS + # health monitor + self.timestamp_for_alive_before_handle_batch = None + self.timestamp_for_alive_after_handle_batch = None + self.health_lock = threading.Lock() + self.engine_output_token_hang = False + + def healthy(self): + """ + whether token processor is healthy + """ + with self.health_lock: + if self.timestamp_for_alive_after_handle_batch is None: # has entered handle batch + if ( + self.timestamp_for_alive_before_handle_batch is not None + and time.time() - self.timestamp_for_alive_before_handle_batch + > envs.FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT + ): + return False + else: + return True + if self.engine_output_token_hang: + return False + return True + def _cleanup_resources(self): """Cleaning up shared memory resources""" if hasattr(self, "executor"): @@ -404,7 +428,14 @@ class TokenProcessor: if self.output_tokens[0, 0] == -2: continue llm_logger.debug(f"rank_id {rank_id} self.output_tokens[0, 0] {self.output_tokens[0, 0]}") + with self.health_lock: + self.timestamp_for_alive_before_handle_batch = time.time() + self.timestamp_for_alive_after_handle_batch = None self._process_batch_output() + with self.health_lock: + self.timestamp_for_alive_before_handle_batch = None + self.timestamp_for_alive_after_handle_batch = time.time() + except Exception as e: llm_logger.info(f"while get input_data error: {e} {traceback.format_exc()!s}") diff --git a/fastdeploy/splitwise/internal_adapter_utils.py b/fastdeploy/splitwise/internal_adapter_utils.py index 47b615b5a..576a3717f 100644 --- a/fastdeploy/splitwise/internal_adapter_utils.py +++ b/fastdeploy/splitwise/internal_adapter_utils.py @@ -95,6 +95,12 @@ class InternalAdapter: self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) elif task["cmd"] == "connect_rdma": self.engine.engine_worker_queue.put_connect_rdma_task(task) + elif task["cmd"] == "check_health": + is_health = self.engine.token_processor.healthy() + result = {"task_id": task_id_str, "result": is_health} + logger.debug(f"Response for task: {task_id_str}: is_health {is_health}") + with self.response_lock: + self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result) except Exception as e: logger.error(f"handle_control_cmd got error: {e}, {traceback.format_exc()!s}")