[BugFix] fix paddleocr prefix cache bug (#4625)

* fix paddleocr prefix cache bug

* disable prefix-caching in ocr
This commit is contained in:
ming1753
2025-10-28 21:38:12 +08:00
committed by GitHub
parent fff5fb5e39
commit 561b9f38d3
5 changed files with 346 additions and 124 deletions

View File

@@ -15,16 +15,23 @@
# limitations under the License.
"""
from typing import Any, Dict, List, Union
import pickle
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
import zmq
from paddleformers.transformers import AutoTokenizer
from PIL import Image
from fastdeploy.engine.request import ImagePosition
from fastdeploy.entrypoints.chat_utils import parse_chat_messages
from fastdeploy.input.ernie4_5_vl_processor import read_video_decord
from fastdeploy.input.utils import IDS_TYPE_FLAG
from fastdeploy.multimodal.hasher import MultimodalHasher
from fastdeploy.utils import data_processor_logger
from .image_processor import ImageProcessor
from .process_video import sample_frames
class DataProcessor:
@@ -48,8 +55,11 @@ class DataProcessor:
def __init__(
self,
model_path: str,
enable_processor_cache: bool = False,
video_min_frames: int = 4,
video_max_frames: int = 768,
video_target_frames: int = -1,
video_fps: int = -1,
tokens_per_second: int = 2,
tokenizer=None,
**kwargs,
@@ -66,6 +76,8 @@ class DataProcessor:
"""
self.min_frames = video_min_frames
self.max_frames = video_max_frames
self.target_frames = video_target_frames
self.fps = video_fps
# Initialize tokenizer with left padding and fast tokenizer
if tokenizer is None:
@@ -74,13 +86,13 @@ class DataProcessor:
else:
self.tokenizer = tokenizer
self.image_processor = ImageProcessor.from_pretrained(model_path) # Initialize image processor
self.enable_processor_cache = enable_processor_cache
# Convolution sizes for patch aggregation
self.spatial_conv_size = self.image_processor.merge_size
self.temporal_conv_size = self.image_processor.temporal_patch_size
# Special tokens and IDs
self.image_token = "<|IMAGE_PLACEHOLDER|>"
self.video_token = "<|video_pad|>"
@@ -100,41 +112,7 @@ class DataProcessor:
"assistant": "Assistant: ",
}
def _pack_outputs(self, outputs):
"""
Pack and convert all output data into numpy arrays with appropriate types.
Args:
outputs (dict): Dictionary containing model outputs with keys:
- images: List of visual features
- grid_thw: List of spatial dimensions
- image_type_ids: List of content type indicators
- input_ids: List of token IDs
- token_type_ids: List of type identifiers
- position_ids: List of position embeddings
Returns:
dict: Processed outputs with all values converted to numpy arrays
"""
# Process visual outputs - stack if exists or set to None if empty
if not outputs["images"]:
outputs["images"] = None # No images case
outputs["grid_thw"] = None # No spatial dimensions
outputs["image_type_ids"] = None # No type IDs
else:
outputs["images"] = np.vstack(outputs["images"]) # Stack image features vertically
outputs["grid_thw"] = np.vstack(outputs["grid_thw"]) # Stack spatial dimensions
outputs["image_type_ids"] = np.array(outputs["image_type_ids"]) # Convert to numpy array
# Convert all outputs to numpy arrays with appropriate types
outputs["input_ids"] = np.array(outputs["input_ids"], dtype=np.int64) # Token IDs as int64
outputs["token_type_ids"] = np.array(outputs["token_type_ids"], dtype=np.int64) # Type IDs as int64
outputs["position_ids"] = np.concatenate(
outputs["position_ids"], axis=1, dtype=np.int64
) # Concatenate position IDs
return outputs
def text2ids(self, text, images=None, videos=None):
def text2ids(self, text, images=None, videos=None, image_uuid=None, video_uuid=None):
"""
Convert text with image/video placeholders into model inputs.
@@ -142,6 +120,8 @@ class DataProcessor:
text: Input text with <|image@placeholder|> and <|video@placeholder|> markers
images: List of PIL Images corresponding to image placeholders
videos: List of video data corresponding to video placeholders
image_uuid: List of unique identifiers for each image, used for caching or hashing.
video_uuid: List of unique identifiers for each video, used for caching or hashing.
Returns:
Dict containing:
@@ -162,11 +142,14 @@ class DataProcessor:
"image_type_ids": [],
"labels": [],
"cur_position": 0,
"pic_cnt": 0,
"video_cnt": 0,
"fps": [],
"mm_positions": [],
"mm_hashes": [],
"vit_seqlen": [],
"vit_position_ids": [],
}
# Define placeholders and their lengths
IMAGE_PLACEHOLDER = self.image_token
VIDEO_PLACEHOLDER = self.video_token
@@ -188,23 +171,30 @@ class DataProcessor:
break
if ed == image_pos:
outputs["pic_cnt"] += 1
self._add_image(images[image_idx], outputs)
image = images[image_idx]
uuid = image_uuid[image_idx] if image_uuid else None
if not isinstance(image, tuple):
self._add_image(image, outputs, uuid)
else:
self._add_processed_image(image, outputs, uuid)
image_idx += 1
st = ed + IMAGE_PLACEHOLDER_LEN
else:
item = videos[video_idx]
if isinstance(item, dict):
frames, meta = self._load_and_process_video(item["video"], item)
uuid = video_uuid[video_idx] if video_uuid else None
if not isinstance(item, tuple):
if isinstance(item, dict):
frames, meta = self._load_and_process_video(item["video"], item)
else:
frames, meta = self._load_and_process_video(item, {})
self._add_video(frames, meta, outputs, uuid)
else:
frames, meta = self._load_and_process_video(item, {})
outputs["video_cnt"] += 1
self._add_video(frames, meta, outputs)
# cached frames are already processed
self._add_processed_video(item, outputs, uuid)
video_idx += 1
st = ed + VIDEO_PLACEHOLDER_LEN
return self._pack_outputs(outputs)
return outputs
def request2ids(
self, request: Dict[str, Any], tgts: List[str] = None
@@ -222,76 +212,84 @@ class DataProcessor:
Dict with same structure as text2ids() output
"""
outputs = {
"input_ids": [],
"token_type_ids": [],
"position_ids": [],
"images": [],
"grid_thw": [],
"image_type_ids": [],
"labels": [],
"cur_position": 0,
"pic_cnt": 0,
"video_cnt": 0,
"vit_seqlen": [],
"vit_position_ids": [],
}
# Parse and validate chat messages
messages = parse_chat_messages(request.get("messages"))
image_message_list = [] # Store visual content messages
mm_items = []
for msg in messages:
role = msg.get("role")
assert role in self.role_prefixes, f"Unsupported role: {role}"
# Normalize content to list format
content_items = msg.get("content")
if not isinstance(content_items, list):
content_items = [content_items]
content = msg.get("content")
if not isinstance(content, list):
content = [content]
# Collect all visual content items
for item in content_items:
if isinstance(item, dict) and item.get("type") in ["image", "video"]:
image_message_list.append(item)
for item in content:
if item.get("type") in ["image", "video"]:
mm_items.append(item)
raw_messages = request["messages"]
request["messages"] = messages
missing_hashes, missing_idx = [], []
for idx, item in enumerate(mm_items):
if not item.get("data"):
# raw data not provided, should be retrieved from processor cache
missing_hashes.append(item.get("uuid"))
missing_idx.append(idx)
prompt_token_ids = self.apply_chat_template(request)
if len(prompt_token_ids) == 0:
raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs")
request["messages"] = raw_messages
if len(missing_hashes) > 0 and not self.enable_processor_cache:
raise ValueError("Missing items cannot be retrieved without processor cache.")
vision_start_index = 0
vision_message_index = 0
for i in range(len(prompt_token_ids)):
if prompt_token_ids[i] == self.vision_start_id:
self._add_text(prompt_token_ids[vision_start_index : i + 1], outputs)
if self.enable_processor_cache:
context = zmq.Context()
dealer = context.socket(zmq.DEALER)
dealer.connect("ipc:///dev/shm/processor_cache.ipc")
vision_start_index = i + 1
image_message = image_message_list[vision_message_index]
missing_items = self.get_processor_cache(dealer, missing_hashes)
for idx in range(len(missing_items)):
if not missing_items[idx]:
raise ValueError(f"Missing item {idx} not found in processor cache")
mm_items[missing_idx[idx]]["data"] = missing_items[idx]
if image_message["type"] == "image":
img = image_message.get("image")
if img is None:
continue
outputs["pic_cnt"] += 1
self._add_image(img, outputs)
images, videos = [], []
image_uuid, video_uuid = [], []
for item in mm_items:
if item.get("type") == "image":
images.append(item["data"])
image_uuid.append(item["uuid"])
elif item.get("type") == "video":
videos.append(item["data"])
video_uuid.append(item["uuid"])
else:
raise ValueError(f"Unsupported multimodal type: {item.get('type')}")
elif image_message["type"] == "video":
video_bytes = image_message.get("video")
if video_bytes is None:
continue
frames, meta = self._load_and_process_video(video_bytes, image_message)
if self.tokenizer.chat_template is None:
raise ValueError("This model does not support chat template.")
outputs["video_cnt"] += 1
self._add_video(frames, meta, outputs)
chat_template_kwargs = request.get("chat_template_kwargs", {})
prompt = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=request.get("add_generation_prompt", True),
**chat_template_kwargs,
)
request["prompt_tokens"] = prompt
vision_message_index += 1
outputs = self.text2ids(prompt, images, videos, image_uuid, video_uuid)
self._add_text(prompt_token_ids[vision_start_index:], outputs)
return self._pack_outputs(outputs)
if self.enable_processor_cache:
missing_idx = set(missing_idx)
hashes_to_cache, items_to_cache = [], []
for idx in range(len(mm_items)):
if idx in missing_idx:
continue
meta = {}
t, h, w = outputs["grid_thw"][idx]
meta["thw"] = (t, h, w)
meta["fps"] = outputs["fps"][idx]
hashes_to_cache.append(outputs["mm_hashes"][idx])
items_to_cache.append((outputs["images"][idx], meta))
self.update_processor_cache(dealer, hashes_to_cache, items_to_cache)
return outputs
def _add_text(self, tokens, outputs: Dict) -> None:
"""
@@ -316,9 +314,9 @@ class DataProcessor:
outputs["input_ids"].extend(tokens)
outputs["token_type_ids"].extend([IDS_TYPE_FLAG["text"]] * num_tokens)
position_ids = self._compute_text_positions(outputs["cur_position"], num_tokens)
outputs["position_ids"].append(position_ids)
outputs["cur_position"] = position_ids.max() + 1
pos_ids = self._compute_text_positions(outputs["cur_position"], num_tokens)
outputs["position_ids"].append(pos_ids)
outputs["cur_position"] = pos_ids.max() + 1
def _compute_text_positions(self, start_pos: int, num_tokens: int) -> np.ndarray:
"""
@@ -336,7 +334,7 @@ class DataProcessor:
position = text_index + start_pos
return position
def _add_image(self, img, outputs: Dict) -> None:
def _add_image(self, img, outputs: Dict, uuid: Optional[str]) -> None:
"""
Add image data to model inputs dictionary.
@@ -353,23 +351,49 @@ class DataProcessor:
num_tokens = ret["grid_thw"].prod() // self.image_processor.merge_size**2
grid_thw = ret["grid_thw"].tolist()
outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens))
outputs["input_ids"].extend([self.image_token_id] * num_tokens)
outputs["token_type_ids"].extend([IDS_TYPE_FLAG["image"]] * num_tokens)
outputs["images"].append(ret["pixel_values"])
if not uuid:
outputs["mm_hashes"].append(MultimodalHasher.hash_features(ret["pixel_values"]))
else:
outputs["mm_hashes"].append(uuid)
outputs["grid_thw"].append(grid_thw)
outputs["image_type_ids"].append(0)
# position_ids
t, h, w = grid_thw
position_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, 0)
outputs["position_ids"].append(position_ids)
outputs["cur_position"] = position_ids.max() + 1
pos_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, 0)
outputs["position_ids"].append(pos_ids)
outputs["cur_position"] = pos_ids.max() + 1
outputs["fps"].append(0)
numel = h * w
outputs["vit_seqlen"].append(numel)
outputs["vit_position_ids"].append(np.arange(numel) % numel)
def _add_video(self, frames, meta: Dict, outputs: Dict) -> None:
def _add_processed_image(self, img_cache: Tuple[np.ndarray, dict], outputs: Dict, uuid: str) -> None:
img, meta = img_cache
num_tokens = img.shape[0] // self.image_processor.merge_size**2
outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens))
outputs["input_ids"].extend([self.image_patch_id] * num_tokens)
outputs["token_type_ids"].extend([IDS_TYPE_FLAG["image"]] * num_tokens)
_, h, w = meta["thw"]
pos_ids = self._compute_vision_positions(outputs["cur_position"], 1, h, w, 0)
outputs["position_ids"].append(pos_ids)
outputs["cur_position"] = pos_ids.max() + 1
outputs["images"].append(img)
outputs["mm_hashes"].append(uuid)
outputs["grid_thw"].append(np.array([[1, h, w]]))
outputs["image_type_ids"].append(0)
outputs["fps"].append(0)
def _add_video(self, frames, meta: Dict, outputs: Dict, uuid: Optional[str]) -> None:
"""
Add video data to model inputs dictionary.
@@ -387,24 +411,52 @@ class DataProcessor:
num_tokens = ret["image_grid_thw"].prod() // self.image_processor.merge_size**2
grid_thw = ret["image_grid_thw"].tolist()
outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens))
outputs["input_ids"].extend([self.video_token_id] * num_tokens)
outputs["token_type_ids"].extend([IDS_TYPE_FLAG["video"]] * num_tokens)
outputs["images"].append(ret["pixel_values"])
if not uuid:
outputs["mm_hashes"].append(MultimodalHasher.hash_features(ret["pixel_values"]))
else:
outputs["mm_hashes"].append(uuid)
outputs["grid_thw"].append(grid_thw)
outputs["image_type_ids"].extend([1] * grid_thw[0])
fps = meta["fps"]
second_per_grid_t = self.temporal_conv_size / fps
t, h, w = grid_thw
position_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, second_per_grid_t)
pos_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, second_per_grid_t)
outputs["position_ids"].append(position_ids)
outputs["cur_position"] = position_ids.max() + 1
outputs["position_ids"].append(pos_ids)
outputs["cur_position"] = pos_ids.max() + 1
outputs["fps"].append(fps)
numel = h * w
outputs["vit_seqlen"].append(numel)
outputs["vit_position_ids"].append(np.arange(numel) % numel)
def _add_processed_video(self, frames_cache: Tuple[np.ndarray, dict], outputs: Dict, uuid: str) -> None:
frames, meta = frames_cache
num_tokens = frames.shape[0] // self.image_processor.merge_size**2
t, h, w = meta["thw"]
outputs["images"].append(frames)
outputs["mm_hashes"].append(uuid)
outputs["grid_thw"].append(np.array([[t, h, w]]))
outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens))
outputs["input_ids"].extend([self.image_patch_id] * num_tokens)
outputs["token_type_ids"].extend([IDS_TYPE_FLAG["video"]] * num_tokens)
outputs["image_type_ids"].extend([1] * t)
fps = meta["fps"]
second_per_grid_t = self.temporal_conv_size / fps
pos_ids = self._compute_vision_positions(outputs["cur_position"], t, h, w, second_per_grid_t)
outputs["position_ids"].append(pos_ids)
outputs["cur_position"] = pos_ids.max() + 1
outputs["fps"].append(fps)
def _compute_vision_positions(
self, start_pos: int, t: int, h: int, w: int, second_per_grid_t: float
) -> np.ndarray:
@@ -438,6 +490,78 @@ class DataProcessor:
position = np.stack([t_index, h_index, w_index]) + start_pos
return position
def _load_and_process_video(self, url: str, item: Dict) -> Tuple[np.ndarray, Dict]:
"""
Load and preprocess video into frames.
Args:
url: Video file path or bytes
item: Dictionary containing processing parameters
Returns:
tuple: (frames, metadata) where:
- frames: Processed video frames as numpy array
- metadata: Updated video metadata dictionary
"""
reader, meta, _ = read_video_decord(url, save_to_disk=False)
# Apply frame sampling if fps or target_frames specified
fps = item.get("fps", self.fps)
num_frames = item.get("target_frames", self.target_frames)
frame_indices = list(range(meta["num_of_frame"]))
if fps > 0 or num_frames > 0:
# Get frame sampling constraints
min_frames = item.get("min_frames", self.min_frames)
max_frames = item.get("max_frames", self.max_frames)
# Sample frames according to specifications
frame_indices = sample_frames(
frame_factor=self.temporal_conv_size, # Ensure divisible by temporal patch size
min_frames=min_frames,
max_frames=max_frames,
metadata=meta,
fps=fps,
num_frames=num_frames,
)
# Update metadata with new frame count and fps
meta["num_of_frame"] = len(frame_indices)
if fps is not None:
meta["fps"] = fps # Use specified fps
meta["duration"] = len(frame_indices) / fps
else:
meta["fps"] = len(frame_indices) / meta["duration"] # Calculate fps from sampled frames
frames = []
for idx in frame_indices:
frame = reader[idx].asnumpy()
image = Image.fromarray(frame, "RGB")
frames.append(image)
frames = np.stack([np.array(f.convert("RGB")) for f in frames], axis=0)
return frames, meta
def get_processor_cache(self, socket, mm_hashes: list[str]) -> list:
"""
get cache correspond to given hash values
"""
req = pickle.dumps(mm_hashes)
socket.send_multipart([b"", req])
_, resp = socket.recv_multipart()
mm_items = pickle.loads(resp)
data_processor_logger.info(f"Get cache of mm_hashes: {mm_hashes}")
return mm_items
def update_processor_cache(self, socket, mm_hashes: list[str], mm_items):
"""
update cache data
"""
req = pickle.dumps((mm_hashes, mm_items))
socket.send_multipart([b"", req])
data_processor_logger.info(f"Update cache of mm_hashes: {mm_hashes}")
def apply_chat_template(self, request):
"""
Apply chat template to convert messages into token sequence.