diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 6fcbeac1e..c7fd2009c 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -452,6 +452,8 @@ class EngineArgs: if "PaddleOCR" in get_model_architecture(self.model, self.model_config_name): envs.FD_ENABLE_MAX_PREFILL = 1 + self.enable_prefix_caching = False + self.max_encoder_cache = 0 @staticmethod def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: diff --git a/fastdeploy/input/paddleocr_vl_processor/paddleocr_vl_processor.py b/fastdeploy/input/paddleocr_vl_processor/paddleocr_vl_processor.py index d9da0ef38..2e9e680c0 100644 --- a/fastdeploy/input/paddleocr_vl_processor/paddleocr_vl_processor.py +++ b/fastdeploy/input/paddleocr_vl_processor/paddleocr_vl_processor.py @@ -47,6 +47,7 @@ class PaddleOCRVLProcessor(TextProcessor): mm_processor_kwargs=None, reasoning_parser_obj=None, tool_parser_obj=None, + enable_processor_cache=False, ): """ Initialize PaddleOCRVLProcessor instance. @@ -65,6 +66,7 @@ class PaddleOCRVLProcessor(TextProcessor): processor_kwargs = self._parse_processor_kwargs(mm_processor_kwargs) self.processor = DataProcessor( model_path=model_name_or_path, + enable_processor_cache=enable_processor_cache, tokens_per_second=config.vision_config.tokens_per_second, tokenizer=self.tokenizer, **processor_kwargs, @@ -252,7 +254,7 @@ class PaddleOCRVLProcessor(TextProcessor): return request - def append_generated_tokens(self, outputs, generated_token_ids): + def append_generated_tokens(self, multimodal_inputs, generated_token_ids): """ Append generated tokens to existing outputs. @@ -260,19 +262,13 @@ class PaddleOCRVLProcessor(TextProcessor): outputs: Current model outputs generated_token_ids: Generated tokens to append """ - out = {"input_ids": [], "token_type_ids": [], "position_ids": [], "cur_position": outputs["cur_position"]} - self.processor._add_text(generated_token_ids, out) + num_tokens = len(generated_token_ids) + multimodal_inputs["input_ids"].extend(generated_token_ids) + multimodal_inputs["token_type_ids"].extend([0] * num_tokens) - outputs["input_ids"] = np.concatenate( - [outputs["input_ids"], np.array(out["input_ids"], dtype=np.int64)], axis=0 - ) - outputs["token_type_ids"] = np.concatenate( - [outputs["token_type_ids"], np.array(out["token_type_ids"], dtype=np.int64)], axis=0 - ) - outputs["position_ids"] = np.concatenate( - [outputs["position_ids"], out["position_ids"][0]], axis=1, dtype=np.int64 - ) - outputs["cur_position"] = out["cur_position"] + pos_ids = self.processor._compute_text_positions(multimodal_inputs["cur_position"], num_tokens) + multimodal_inputs["position_ids"].append(pos_ids) + multimodal_inputs["cur_position"] += num_tokens def pack_outputs(self, outputs): """ @@ -284,6 +280,22 @@ class PaddleOCRVLProcessor(TextProcessor): Returns: dict: Packed output dictionary with all required fields """ + if not outputs["images"]: + outputs["images"] = None # No images case + outputs["grid_thw"] = None # No spatial dimensions + outputs["image_type_ids"] = None # No type IDs + else: + outputs["images"] = np.vstack(outputs["images"]) # Stack image features vertically + outputs["grid_thw"] = np.vstack(outputs["grid_thw"]) # Stack spatial dimensions + outputs["image_type_ids"] = np.array(outputs["image_type_ids"]) # Convert to numpy array + + # Convert all outputs to numpy arrays with appropriate types + outputs["input_ids"] = np.array(outputs["input_ids"], dtype=np.int64) # Token IDs as int64 + outputs["token_type_ids"] = np.array(outputs["token_type_ids"], dtype=np.int64) # Type IDs as int64 + outputs["position_ids"] = np.concatenate( + outputs["position_ids"], axis=1, dtype=np.int64 + ) # Concatenate position ID + outputs["image_patch_id"] = self.processor.image_token_id outputs["video_patch_id"] = self.processor.video_token_id outputs["position_ids"] = outputs["position_ids"].transpose(1, 0) diff --git a/fastdeploy/input/paddleocr_vl_processor/process.py b/fastdeploy/input/paddleocr_vl_processor/process.py index 436721b52..67acbad44 100644 --- a/fastdeploy/input/paddleocr_vl_processor/process.py +++ b/fastdeploy/input/paddleocr_vl_processor/process.py @@ -15,16 +15,23 @@ # limitations under the License. """ -from typing import Any, Dict, List, Union +import pickle +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np +import zmq from paddleformers.transformers import AutoTokenizer +from PIL import Image +from fastdeploy.engine.request import ImagePosition from fastdeploy.entrypoints.chat_utils import parse_chat_messages +from fastdeploy.input.ernie4_5_vl_processor import read_video_decord from fastdeploy.input.utils import IDS_TYPE_FLAG +from fastdeploy.multimodal.hasher import MultimodalHasher from fastdeploy.utils import data_processor_logger from .image_processor import ImageProcessor +from .process_video import sample_frames class DataProcessor: @@ -48,8 +55,11 @@ class DataProcessor: def __init__( self, model_path: str, + enable_processor_cache: bool = False, video_min_frames: int = 4, video_max_frames: int = 768, + video_target_frames: int = -1, + video_fps: int = -1, tokens_per_second: int = 2, tokenizer=None, **kwargs, @@ -66,6 +76,8 @@ class DataProcessor: """ self.min_frames = video_min_frames self.max_frames = video_max_frames + self.target_frames = video_target_frames + self.fps = video_fps # Initialize tokenizer with left padding and fast tokenizer if tokenizer is None: @@ -74,13 +86,13 @@ class DataProcessor: else: self.tokenizer = tokenizer self.image_processor = ImageProcessor.from_pretrained(model_path) # Initialize image processor + self.enable_processor_cache = enable_processor_cache # Convolution sizes for patch aggregation self.spatial_conv_size = self.image_processor.merge_size self.temporal_conv_size = self.image_processor.temporal_patch_size # Special tokens and IDs - self.image_token = "<|IMAGE_PLACEHOLDER|>" self.video_token = "<|video_pad|>" @@ -100,41 +112,7 @@ class DataProcessor: "assistant": "Assistant: ", } - def _pack_outputs(self, outputs): - """ - Pack and convert all output data into numpy arrays with appropriate types. - - Args: - outputs (dict): Dictionary containing model outputs with keys: - - images: List of visual features - - grid_thw: List of spatial dimensions - - image_type_ids: List of content type indicators - - input_ids: List of token IDs - - token_type_ids: List of type identifiers - - position_ids: List of position embeddings - - Returns: - dict: Processed outputs with all values converted to numpy arrays - """ - # Process visual outputs - stack if exists or set to None if empty - if not outputs["images"]: - outputs["images"] = None # No images case - outputs["grid_thw"] = None # No spatial dimensions - outputs["image_type_ids"] = None # No type IDs - else: - outputs["images"] = np.vstack(outputs["images"]) # Stack image features vertically - outputs["grid_thw"] = np.vstack(outputs["grid_thw"]) # Stack spatial dimensions - outputs["image_type_ids"] = np.array(outputs["image_type_ids"]) # Convert to numpy array - - # Convert all outputs to numpy arrays with appropriate types - outputs["input_ids"] = np.array(outputs["input_ids"], dtype=np.int64) # Token IDs as int64 - outputs["token_type_ids"] = np.array(outputs["token_type_ids"], dtype=np.int64) # Type IDs as int64 - outputs["position_ids"] = np.concatenate( - outputs["position_ids"], axis=1, dtype=np.int64 - ) # Concatenate position IDs - return outputs - - def text2ids(self, text, images=None, videos=None): + def text2ids(self, text, images=None, videos=None, image_uuid=None, video_uuid=None): """ Convert text with image/video placeholders into model inputs. @@ -142,6 +120,8 @@ class DataProcessor: text: Input text with <|image@placeholder|> and <|video@placeholder|> markers images: List of PIL Images corresponding to image placeholders videos: List of video data corresponding to video placeholders + image_uuid: List of unique identifiers for each image, used for caching or hashing. + video_uuid: List of unique identifiers for each video, used for caching or hashing. Returns: Dict containing: @@ -162,11 +142,14 @@ class DataProcessor: "image_type_ids": [], "labels": [], "cur_position": 0, - "pic_cnt": 0, "video_cnt": 0, + "fps": [], + "mm_positions": [], + "mm_hashes": [], "vit_seqlen": [], "vit_position_ids": [], } + # Define placeholders and their lengths IMAGE_PLACEHOLDER = self.image_token VIDEO_PLACEHOLDER = self.video_token @@ -188,23 +171,30 @@ class DataProcessor: break if ed == image_pos: - outputs["pic_cnt"] += 1 - self._add_image(images[image_idx], outputs) + image = images[image_idx] + uuid = image_uuid[image_idx] if image_uuid else None + if not isinstance(image, tuple): + self._add_image(image, outputs, uuid) + else: + self._add_processed_image(image, outputs, uuid) image_idx += 1 st = ed + IMAGE_PLACEHOLDER_LEN else: item = videos[video_idx] - if isinstance(item, dict): - frames, meta = self._load_and_process_video(item["video"], item) + uuid = video_uuid[video_idx] if video_uuid else None + if not isinstance(item, tuple): + if isinstance(item, dict): + frames, meta = self._load_and_process_video(item["video"], item) + else: + frames, meta = self._load_and_process_video(item, {}) + self._add_video(frames, meta, outputs, uuid) else: - frames, meta = self._load_and_process_video(item, {}) - - outputs["video_cnt"] += 1 - self._add_video(frames, meta, outputs) + # cached frames are already processed + self._add_processed_video(item, outputs, uuid) video_idx += 1 st = ed + VIDEO_PLACEHOLDER_LEN - return self._pack_outputs(outputs) + return outputs def request2ids( self, request: Dict[str, Any], tgts: List[str] = None @@ -222,76 +212,84 @@ class DataProcessor: Dict with same structure as text2ids() output """ - outputs = { - "input_ids": [], - "token_type_ids": [], - "position_ids": [], - "images": [], - "grid_thw": [], - "image_type_ids": [], - "labels": [], - "cur_position": 0, - "pic_cnt": 0, - "video_cnt": 0, - "vit_seqlen": [], - "vit_position_ids": [], - } - # Parse and validate chat messages messages = parse_chat_messages(request.get("messages")) - image_message_list = [] # Store visual content messages - + mm_items = [] for msg in messages: role = msg.get("role") assert role in self.role_prefixes, f"Unsupported role: {role}" # Normalize content to list format - content_items = msg.get("content") - if not isinstance(content_items, list): - content_items = [content_items] - + content = msg.get("content") + if not isinstance(content, list): + content = [content] # Collect all visual content items - for item in content_items: - if isinstance(item, dict) and item.get("type") in ["image", "video"]: - image_message_list.append(item) + for item in content: + if item.get("type") in ["image", "video"]: + mm_items.append(item) - raw_messages = request["messages"] - request["messages"] = messages + missing_hashes, missing_idx = [], [] + for idx, item in enumerate(mm_items): + if not item.get("data"): + # raw data not provided, should be retrieved from processor cache + missing_hashes.append(item.get("uuid")) + missing_idx.append(idx) - prompt_token_ids = self.apply_chat_template(request) - if len(prompt_token_ids) == 0: - raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs") - request["messages"] = raw_messages + if len(missing_hashes) > 0 and not self.enable_processor_cache: + raise ValueError("Missing items cannot be retrieved without processor cache.") - vision_start_index = 0 - vision_message_index = 0 - for i in range(len(prompt_token_ids)): - if prompt_token_ids[i] == self.vision_start_id: - self._add_text(prompt_token_ids[vision_start_index : i + 1], outputs) + if self.enable_processor_cache: + context = zmq.Context() + dealer = context.socket(zmq.DEALER) + dealer.connect("ipc:///dev/shm/processor_cache.ipc") - vision_start_index = i + 1 - image_message = image_message_list[vision_message_index] + missing_items = self.get_processor_cache(dealer, missing_hashes) + for idx in range(len(missing_items)): + if not missing_items[idx]: + raise ValueError(f"Missing item {idx} not found in processor cache") + mm_items[missing_idx[idx]]["data"] = missing_items[idx] - if image_message["type"] == "image": - img = image_message.get("image") - if img is None: - continue - outputs["pic_cnt"] += 1 - self._add_image(img, outputs) + images, videos = [], [] + image_uuid, video_uuid = [], [] + for item in mm_items: + if item.get("type") == "image": + images.append(item["data"]) + image_uuid.append(item["uuid"]) + elif item.get("type") == "video": + videos.append(item["data"]) + video_uuid.append(item["uuid"]) + else: + raise ValueError(f"Unsupported multimodal type: {item.get('type')}") - elif image_message["type"] == "video": - video_bytes = image_message.get("video") - if video_bytes is None: - continue - frames, meta = self._load_and_process_video(video_bytes, image_message) + if self.tokenizer.chat_template is None: + raise ValueError("This model does not support chat template.") - outputs["video_cnt"] += 1 - self._add_video(frames, meta, outputs) + chat_template_kwargs = request.get("chat_template_kwargs", {}) + prompt = self.tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=request.get("add_generation_prompt", True), + **chat_template_kwargs, + ) + request["prompt_tokens"] = prompt - vision_message_index += 1 + outputs = self.text2ids(prompt, images, videos, image_uuid, video_uuid) - self._add_text(prompt_token_ids[vision_start_index:], outputs) - return self._pack_outputs(outputs) + if self.enable_processor_cache: + missing_idx = set(missing_idx) + hashes_to_cache, items_to_cache = [], [] + for idx in range(len(mm_items)): + if idx in missing_idx: + continue + meta = {} + t, h, w = outputs["grid_thw"][idx] + meta["thw"] = (t, h, w) + meta["fps"] = outputs["fps"][idx] + hashes_to_cache.append(outputs["mm_hashes"][idx]) + items_to_cache.append((outputs["images"][idx], meta)) + self.update_processor_cache(dealer, hashes_to_cache, items_to_cache) + + return outputs def _add_text(self, tokens, outputs: Dict) -> None: """ @@ -316,9 +314,9 @@ class DataProcessor: outputs["input_ids"].extend(tokens) outputs["token_type_ids"].extend([IDS_TYPE_FLAG["text"]] * num_tokens) - position_ids = self._compute_text_positions(outputs["cur_position"], num_tokens) - outputs["position_ids"].append(position_ids) - outputs["cur_position"] = position_ids.max() + 1 + pos_ids = self._compute_text_positions(outputs["cur_position"], num_tokens) + outputs["position_ids"].append(pos_ids) + outputs["cur_position"] = pos_ids.max() + 1 def _compute_text_positions(self, start_pos: int, num_tokens: int) -> np.ndarray: """ @@ -336,7 +334,7 @@ class DataProcessor: position = text_index + start_pos return position - def _add_image(self, img, outputs: Dict) -> None: + def _add_image(self, img, outputs: Dict, uuid: Optional[str]) -> None: """ Add image data to model inputs dictionary. @@ -353,23 +351,49 @@ class DataProcessor: num_tokens = ret["grid_thw"].prod() // self.image_processor.merge_size**2 grid_thw = ret["grid_thw"].tolist() + outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens)) outputs["input_ids"].extend([self.image_token_id] * num_tokens) outputs["token_type_ids"].extend([IDS_TYPE_FLAG["image"]] * num_tokens) outputs["images"].append(ret["pixel_values"]) + if not uuid: + outputs["mm_hashes"].append(MultimodalHasher.hash_features(ret["pixel_values"])) + else: + outputs["mm_hashes"].append(uuid) outputs["grid_thw"].append(grid_thw) outputs["image_type_ids"].append(0) # position_ids t, h, w = grid_thw - position_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, 0) - outputs["position_ids"].append(position_ids) - outputs["cur_position"] = position_ids.max() + 1 + pos_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, 0) + outputs["position_ids"].append(pos_ids) + outputs["cur_position"] = pos_ids.max() + 1 + outputs["fps"].append(0) numel = h * w outputs["vit_seqlen"].append(numel) outputs["vit_position_ids"].append(np.arange(numel) % numel) - def _add_video(self, frames, meta: Dict, outputs: Dict) -> None: + def _add_processed_image(self, img_cache: Tuple[np.ndarray, dict], outputs: Dict, uuid: str) -> None: + img, meta = img_cache + num_tokens = img.shape[0] // self.image_processor.merge_size**2 + + outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens)) + outputs["input_ids"].extend([self.image_patch_id] * num_tokens) + outputs["token_type_ids"].extend([IDS_TYPE_FLAG["image"]] * num_tokens) + + _, h, w = meta["thw"] + pos_ids = self._compute_vision_positions(outputs["cur_position"], 1, h, w, 0) + outputs["position_ids"].append(pos_ids) + outputs["cur_position"] = pos_ids.max() + 1 + + outputs["images"].append(img) + outputs["mm_hashes"].append(uuid) + outputs["grid_thw"].append(np.array([[1, h, w]])) + outputs["image_type_ids"].append(0) + + outputs["fps"].append(0) + + def _add_video(self, frames, meta: Dict, outputs: Dict, uuid: Optional[str]) -> None: """ Add video data to model inputs dictionary. @@ -387,24 +411,52 @@ class DataProcessor: num_tokens = ret["image_grid_thw"].prod() // self.image_processor.merge_size**2 grid_thw = ret["image_grid_thw"].tolist() + outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens)) outputs["input_ids"].extend([self.video_token_id] * num_tokens) outputs["token_type_ids"].extend([IDS_TYPE_FLAG["video"]] * num_tokens) outputs["images"].append(ret["pixel_values"]) + if not uuid: + outputs["mm_hashes"].append(MultimodalHasher.hash_features(ret["pixel_values"])) + else: + outputs["mm_hashes"].append(uuid) outputs["grid_thw"].append(grid_thw) outputs["image_type_ids"].extend([1] * grid_thw[0]) fps = meta["fps"] second_per_grid_t = self.temporal_conv_size / fps t, h, w = grid_thw - position_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, second_per_grid_t) + pos_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, second_per_grid_t) - outputs["position_ids"].append(position_ids) - outputs["cur_position"] = position_ids.max() + 1 + outputs["position_ids"].append(pos_ids) + outputs["cur_position"] = pos_ids.max() + 1 + outputs["fps"].append(fps) numel = h * w outputs["vit_seqlen"].append(numel) outputs["vit_position_ids"].append(np.arange(numel) % numel) + def _add_processed_video(self, frames_cache: Tuple[np.ndarray, dict], outputs: Dict, uuid: str) -> None: + frames, meta = frames_cache + num_tokens = frames.shape[0] // self.image_processor.merge_size**2 + + t, h, w = meta["thw"] + outputs["images"].append(frames) + outputs["mm_hashes"].append(uuid) + outputs["grid_thw"].append(np.array([[t, h, w]])) + + outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens)) + outputs["input_ids"].extend([self.image_patch_id] * num_tokens) + outputs["token_type_ids"].extend([IDS_TYPE_FLAG["video"]] * num_tokens) + outputs["image_type_ids"].extend([1] * t) + + fps = meta["fps"] + second_per_grid_t = self.temporal_conv_size / fps + pos_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, second_per_grid_t) + outputs["position_ids"].append(pos_ids) + outputs["cur_position"] = pos_ids.max() + 1 + + outputs["fps"].append(fps) + def _compute_vision_positions( self, start_pos: int, t: int, h: int, w: int, second_per_grid_t: float ) -> np.ndarray: @@ -438,6 +490,78 @@ class DataProcessor: position = np.stack([t_index, h_index, w_index]) + start_pos return position + def _load_and_process_video(self, url: str, item: Dict) -> Tuple[np.ndarray, Dict]: + """ + Load and preprocess video into frames. + + Args: + url: Video file path or bytes + item: Dictionary containing processing parameters + + Returns: + tuple: (frames, metadata) where: + - frames: Processed video frames as numpy array + - metadata: Updated video metadata dictionary + """ + reader, meta, _ = read_video_decord(url, save_to_disk=False) + + # Apply frame sampling if fps or target_frames specified + fps = item.get("fps", self.fps) + num_frames = item.get("target_frames", self.target_frames) + + frame_indices = list(range(meta["num_of_frame"])) + if fps > 0 or num_frames > 0: + # Get frame sampling constraints + min_frames = item.get("min_frames", self.min_frames) + max_frames = item.get("max_frames", self.max_frames) + + # Sample frames according to specifications + frame_indices = sample_frames( + frame_factor=self.temporal_conv_size, # Ensure divisible by temporal patch size + min_frames=min_frames, + max_frames=max_frames, + metadata=meta, + fps=fps, + num_frames=num_frames, + ) + + # Update metadata with new frame count and fps + meta["num_of_frame"] = len(frame_indices) + if fps is not None: + meta["fps"] = fps # Use specified fps + meta["duration"] = len(frame_indices) / fps + else: + meta["fps"] = len(frame_indices) / meta["duration"] # Calculate fps from sampled frames + + frames = [] + for idx in frame_indices: + frame = reader[idx].asnumpy() + image = Image.fromarray(frame, "RGB") + frames.append(image) + frames = np.stack([np.array(f.convert("RGB")) for f in frames], axis=0) + + return frames, meta + + def get_processor_cache(self, socket, mm_hashes: list[str]) -> list: + """ + get cache correspond to given hash values + """ + req = pickle.dumps(mm_hashes) + socket.send_multipart([b"", req]) + _, resp = socket.recv_multipart() + mm_items = pickle.loads(resp) + data_processor_logger.info(f"Get cache of mm_hashes: {mm_hashes}") + + return mm_items + + def update_processor_cache(self, socket, mm_hashes: list[str], mm_items): + """ + update cache data + """ + req = pickle.dumps((mm_hashes, mm_items)) + socket.send_multipart([b"", req]) + data_processor_logger.info(f"Update cache of mm_hashes: {mm_hashes}") + def apply_chat_template(self, request): """ Apply chat template to convert messages into token sequence. diff --git a/fastdeploy/input/paddleocr_vl_processor/process_video.py b/fastdeploy/input/paddleocr_vl_processor/process_video.py new file mode 100644 index 000000000..c7089d26d --- /dev/null +++ b/fastdeploy/input/paddleocr_vl_processor/process_video.py @@ -0,0 +1,82 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +import math +from typing import Optional, Union + +import numpy as np + + +def sample_frames( + frame_factor: int, + min_frames: int, + max_frames: int, + metadata: Optional[dict] = None, + fps: Optional[Union[int, float]] = None, + num_frames: Optional[int] = None, +): + """ + Sample frames from video according to specified criteria. + + Args: + frame_factor: Ensure sampled frames are multiples of this factor + min_frames: Minimum number of frames to sample + max_frames: Maximum number of frames to sample + metadata: Video metadata containing fps information + fps: Target frames per second for sampling + num_frames: Exact number of frames to sample + + Returns: + np.ndarray: Sampled video frames + + Raises: + ValueError: If both fps and num_frames are specified, + or if required metadata is missing, + or if requested frames exceed available frames + """ + if fps > 0 and num_frames > 0: + raise ValueError("`num_frames` and `fps` are mutually exclusive arguments, please use only one!") + + total_num_frames = metadata["num_of_frame"] + + # If num_frames is not given but fps is, calculate num_frames from fps + if num_frames > 0: + num_frames = round(num_frames / frame_factor) * frame_factor + elif fps > 0: + if metadata is None: + raise ValueError( + "Asked to sample `fps` frames per second but no video metadata was provided which is required when sampling with `fps`. " + "Please pass in `VideoMetadata` object or use a fixed `num_frames` per input video" + ) + max_frames = math.floor(min(max_frames, total_num_frames) / frame_factor) * frame_factor + num_frames = total_num_frames / metadata["fps"] * fps + num_frames = min(min(max(num_frames, min_frames), max_frames), total_num_frames) + num_frames = math.floor(num_frames / frame_factor) * frame_factor + if num_frames > total_num_frames: + raise ValueError( + f"Video can't be sampled. The inferred `num_frames={num_frames}` exceeds `total_num_frames={total_num_frames}`. " + "Decrease `num_frames` or `fps` for sampling." + ) + + # Calculate frame indices based on sampling strategy + if num_frames > 0: + # Evenly spaced sampling for target frame count + indices = np.arange(0, total_num_frames, total_num_frames / num_frames).astype(np.int32) + else: + # Keep all frames if no sampling requested + indices = np.arange(0, total_num_frames).astype(np.int32) + + return indices diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 9ea08542b..30bf8b103 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -814,6 +814,8 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: architecture = fd_config.model_config.architectures[0] if "PaddleOCR" in architecture: envs.FD_ENABLE_MAX_PREFILL = 1 + fd_config.cache_config.enable_prefix_caching = False + fd_config.cache_config.max_encoder_cache = 0 return fd_config