From c801d31c9c4e5ce9f77c640d318d54387b98df02 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 31 Oct 2025 15:26:35 +0800 Subject: [PATCH] add checker (#4711) --- fastdeploy/engine/common_engine.py | 41 +++++++++++++++++++++++++++++- fastdeploy/envs.py | 2 ++ fastdeploy/utils.py | 31 ++++++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index b9fad30e6..ff521ba6e 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -47,7 +47,14 @@ from fastdeploy.model_executor.guided_decoding import schema_checker from fastdeploy.plugins.token_processor import load_token_processor_plugins from fastdeploy.splitwise.internal_adapter_utils import InternalAdapter from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector -from fastdeploy.utils import EngineError, envs, get_logger, llm_logger +from fastdeploy.utils import ( + EngineError, + check_download_links, + envs, + get_logger, + init_bos_client, + llm_logger, +) try: TokenProcessor = load_token_processor_plugins() @@ -128,6 +135,7 @@ class EngineService: * self.cfg.cache_config.block_size ) + self.bos_client = None self.guided_decoding_checker = None if self.cfg.structured_outputs_config.guided_decoding_backend != "off": self.guided_decoding_checker = schema_checker( @@ -827,6 +835,24 @@ class EngineService: self.llm_logger.error(f"Receive request error: {err_msg}") results.append((request.request_id, err_msg)) + if self._has_features_info(request) and err_msg is None: + if self.bos_client is None: + self.bos_client = init_bos_client() + + download_urls = [] + inputs = request.multimodal_inputs + if inputs.get("video_feature_urls") is not None: + download_urls.extend(inputs.get("video_feature_urls")) + if inputs.get("image_feature_urls") is not None: + download_urls.extend(inputs.get("image_feature_urls")) + if inputs.get("audio_feature_urls") is not None: + download_urls.extend(inputs.get("audio_feature_urls")) + + err_msg = check_download_links(self.bos_client, download_urls) + if err_msg: + llm_logger.error(f"Receive request {request.request_id} download error: {err_msg}") + results.append((request.request_id, err_msg)) + if err_msg is None: insert_task.append(request) @@ -877,6 +903,19 @@ class EngineService: del self.data_processor.decode_status[req_id] return delta_text, token_ids + def _has_features_info(self, task): + inputs = task.multimodal_inputs + if inputs is None or len(inputs) == 0: + return False + + if ( + (inputs.get("video_feature_urls") is not None and len(inputs["video_feature_urls"]) > 0) + or (inputs.get("image_feature_urls") is not None and len(inputs["image_feature_urls"]) > 0) + or (inputs.get("audio_feature_urls") is not None and len(inputs["audio_feature_urls"]) > 0) + ): + return True + return False + def _zmq_send_generated_tokens(self): """ Recieve output for zmq diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index d60750d6a..3d4c2feaa 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -128,6 +128,8 @@ environment_variables: dict[str, Callable[[], Any]] = { "FD_CACHE_PROC_EXIT_TIMEOUT": lambda: int(os.getenv("FD_CACHE_PROC_EXIT_TIMEOUT", "600")), # Count for cache_transfer_manager process error "FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")), + "ENCODE_FEATURE_BOS_AK": lambda: os.getenv("ENCODE_FEATURE_BOS_AK"), + "ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"), } diff --git a/fastdeploy/utils.py b/fastdeploy/utils.py index 604efd1a1..a5a45a8a2 100644 --- a/fastdeploy/utils.py +++ b/fastdeploy/utils.py @@ -944,6 +944,37 @@ def get_logger(name, file_name=None, without_formater=False, print_to_console=Fa return FastDeployLogger().get_logger(name, file_name, without_formater, print_to_console) +def check_download_links(bos_client, links, timeout=1): + """ + check bos download links + """ + for link in links: + try: + if link.startswith("bos://"): + link = link.replace("bos://", "") + + bucket_name = "/".join(link.split("/")[1:-1]) + object_key = link.split("/")[-1] + response = bos_client.get_object_meta_data(bucket_name, object_key) + assert ( + int(response.metadata.content_length) > 0 + ), f"bos download length error, {response.metadata.content_length}" + except Exception as e: + return f"link {link} download error: {str(e)}" + return None + + +def init_bos_client(): + from baidubce.auth.bce_credentials import BceCredentials + from baidubce.bce_client_configuration import BceClientConfiguration + from baidubce.services.bos.bos_client import BosClient + + cfg = BceClientConfiguration( + credentials=BceCredentials(envs.ENCODE_FEATURE_BOS_AK, envs.ENCODE_FEATURE_BOS_SK), endpoint="bj.bcebos.com" + ) + return BosClient(cfg) + + llm_logger = get_logger("fastdeploy", "fastdeploy.log") data_processor_logger = get_logger("data_processor", "data_processor.log") scheduler_logger = get_logger("scheduler", "scheduler.log")