mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
* support mm prefix caching * update code * fix mm_hashes * support encoder cache * add encoder cache * update code * update encoder cache * fix features bug * fix worker bug * support processor cache, need to optimize yet * refactor multimodal data cache * update code * update code * update v1 scheduler * update code * update code * update codestyle * support turn off processor cache and encoder cache * update pre-commit * fix code * solve review * update code * update code * update test case * set processor cache in GiB * update test case * support mm prefix caching for qwen model * fix code style check * update pre-commit * fix unit test * fix unit test * add ci test case * fix rescheduled bug * change text_after_process to prompt_tokens * fix unit test * fix chat template * change model path * [EP] fix adapter bugs (#4572) * Update expert_service.py * Update common_engine.py * Update expert_service.py * fix v1 hang bug (#4573) * fix import image_ops error on some platforms (#4559) * [CLI]Update parameters in bench latecy cli tool and fix collect-env cli tool (#4558) * add collect-env * del files * [Graph Optimization] Add dy_runnable and introduce cudagraph_switch_threshold for cudagraph mode switching (#4578) * add new branch for sot * reorder * fix batch bug * [XPU]Moe uses a new operator (#4585) * [XPU]Moe uses a new operator * [XPU]Moe uses a new operator * update response * [Feature] Support Paddle-OCR (#4396) * init * update code * fix code style & disable thinking * adapt for common_engine.update_mm_requests_chunk_size * use 3d rope * use flash_attn_unpadded * opt siglip * update to be compatible with the latest codebase * fix typo * optim OCR performance * fix bug * fix bug * fix bug * fix bug * normlize name * modify xpu rope * revert logger * fix bug * fix bug * fix bug * support default_v1 * optim performance * fix bug --------- Co-authored-by: root <root@szzj-acg-tge1-fdda9.szzj.baidu.com> Co-authored-by: zhangyue66 <zhangyue66@baidu.com> * [DataProcessor] add reasoning_tokens into usage info (#4520) * add reasoning_tokens into usage info initial commit * add unit tests * modify unit test * modify and add unit tests * fix unit test * move steam usage to processor * modify processor * modify test_logprobs * modify test_logprobs.py * modify stream reasoning tokens accumulation * fix unit test * perf: Optimize task queue communication from engine to worker (#4531) * perf: Optimize task queue communication from engine to worker * perf: get_tasks to numpy * perf: get_tasks remove to_numpy * fix: request & replace ENV * remove test_e2w_perf.py * fix code style --------- Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> * Clean up ports after processing results (#4587) * [CI] Add /re-run command in PR comments to restart failed CI workflows (#4593) * [Others] api server exits when worker process is dead (#3271) * [fix] fix terminal hangs when worker process is dead * [chore] change sleep time of monitor * [chore] remove redundant comments * update docs --------- Co-authored-by: ApplEOFDiscord <wwy640130@163.com> Co-authored-by: ApplEOFDiscord <31272106+ApplEOFDiscord@users.noreply.github.com> Co-authored-by: ltd0924 <32387785+ltd0924@users.noreply.github.com> Co-authored-by: yinwei <yinwei_hust@163.com> Co-authored-by: JYChen <zoooo0820@qq.com> Co-authored-by: qwes5s5 <45442318+qwes5s5@users.noreply.github.com> Co-authored-by: Ryan <zihaohuang@aliyun.com> Co-authored-by: yyssys <atyangshuang@foxmail.com> Co-authored-by: ming1753 <61511741+ming1753@users.noreply.github.com> Co-authored-by: root <root@szzj-acg-tge1-fdda9.szzj.baidu.com> Co-authored-by: zhangyue66 <zhangyue66@baidu.com> Co-authored-by: kxz2002 <115912648+kxz2002@users.noreply.github.com> Co-authored-by: SunLei <sunlei5788@gmail.com> Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> Co-authored-by: Zhang Yulong <35552275+ZhangYulongg@users.noreply.github.com> Co-authored-by: YuBaoku <49938469+EmmonsCurse@users.noreply.github.com> Co-authored-by: 李泳桦 <39643373+liyonghua0910@users.noreply.github.com>
164 lines
5.5 KiB
Python
164 lines
5.5 KiB
Python
"""
|
|
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
|
|
import pickle
|
|
import threading
|
|
from abc import ABC, abstractmethod
|
|
from collections import OrderedDict
|
|
from typing import Any, Tuple
|
|
|
|
import numpy as np
|
|
import zmq
|
|
|
|
from fastdeploy import envs
|
|
from fastdeploy.engine.request import ImagePosition
|
|
from fastdeploy.utils import get_logger
|
|
|
|
logger = get_logger("prefix_cache_manager", "cache_manager.log")
|
|
|
|
|
|
class MultimodalLRUCache(ABC):
|
|
"""
|
|
General lru cache for multimodal data
|
|
"""
|
|
|
|
def __init__(self, max_cache_size):
|
|
self.cache = OrderedDict()
|
|
self.current_cache_size = 0
|
|
self.max_cache_size = max_cache_size
|
|
|
|
def apply_cache(self, mm_hashes: list[str], mm_items: list[Any]) -> list[str]:
|
|
"""
|
|
apply data cache, return evicted data
|
|
"""
|
|
assert len(mm_hashes) == len(mm_items), "mm_hashes and mm_items should have same length"
|
|
|
|
evicted_hashes = []
|
|
for idx in range(len(mm_hashes)):
|
|
if mm_hashes[idx] in self.cache:
|
|
self.cache.move_to_end(mm_hashes[idx])
|
|
else:
|
|
item_size = self.get_item_size(mm_items[idx])
|
|
if self.current_cache_size + item_size >= self.max_cache_size:
|
|
if item_size > self.max_cache_size:
|
|
# cannot be inserted even if we clear all cached data, skip it directly
|
|
continue
|
|
needed = item_size - (self.max_cache_size - self.current_cache_size)
|
|
evicted_hashes.extend(self.evict_cache(needed))
|
|
self.cache[mm_hashes[idx]] = mm_items[idx]
|
|
self.current_cache_size += item_size
|
|
|
|
return evicted_hashes
|
|
|
|
def evict_cache(self, needed: int) -> list[str]:
|
|
"""
|
|
evict data cache with needed size
|
|
"""
|
|
reduced_size, evicted_hashes = 0, []
|
|
while reduced_size < needed and len(self.cache):
|
|
mm_hash, mm_item = self.cache.popitem(last=False)
|
|
evicted_hashes.append(mm_hash)
|
|
reduced_size += self.get_item_size(mm_item)
|
|
self.current_cache_size -= self.get_item_size(mm_item)
|
|
|
|
return evicted_hashes
|
|
|
|
def get_cache(self, mm_hashes: list[str]) -> list[Any]:
|
|
"""
|
|
get cached data correspond to given hash values
|
|
"""
|
|
mm_items = []
|
|
for mm_hash in mm_hashes:
|
|
if mm_hash not in self.cache:
|
|
mm_items.append(None)
|
|
continue
|
|
mm_items.append(self.cache[mm_hash])
|
|
|
|
return mm_items
|
|
|
|
def clear_cache(self):
|
|
"""
|
|
clear all cached data
|
|
"""
|
|
evicted_hashes = list(self.cache.keys())
|
|
self.cache.clear()
|
|
self.current_cache_size = 0
|
|
|
|
return evicted_hashes
|
|
|
|
@abstractmethod
|
|
def get_item_size(self, item: Any) -> int:
|
|
raise NotImplementedError("Subclasses must define how to get size of an item")
|
|
|
|
|
|
class EncoderCacheManager(MultimodalLRUCache):
|
|
"""
|
|
EncoderCacheManager is used to cache image features
|
|
"""
|
|
|
|
def __init__(self, max_encoder_cache):
|
|
super().__init__(max_encoder_cache)
|
|
|
|
def get_item_size(self, item: ImagePosition) -> int:
|
|
return item.length
|
|
|
|
|
|
class ProcessorCacheManager(MultimodalLRUCache):
|
|
"""
|
|
ProcessorCacheManager is used to cache processed data
|
|
"""
|
|
|
|
def __init__(self, max_processor_cache):
|
|
super().__init__(max_processor_cache)
|
|
|
|
self.context = zmq.Context()
|
|
|
|
self.router = self.context.socket(zmq.ROUTER)
|
|
self.router.setsockopt(zmq.SNDHWM, int(envs.FD_ZMQ_SNDHWM))
|
|
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
|
|
self.router.setsockopt(zmq.SNDTIMEO, -1)
|
|
self.router.bind("ipc:///dev/shm/processor_cache.ipc")
|
|
|
|
self.poller = zmq.Poller()
|
|
self.poller.register(self.router, zmq.POLLIN)
|
|
|
|
self.handler_thread = threading.Thread(target=self.cache_request_handler, daemon=True)
|
|
self.handler_thread.start()
|
|
|
|
def get_item_size(self, item: Tuple[np.ndarray, dict]) -> int:
|
|
return item[0].nbytes
|
|
|
|
def cache_request_handler(self):
|
|
try:
|
|
while True:
|
|
events = dict(self.poller.poll())
|
|
|
|
if self.router in events:
|
|
client, _, content = self.router.recv_multipart()
|
|
req = pickle.loads(content)
|
|
|
|
if isinstance(req, tuple):
|
|
# apply cache request, in format of (mm_hashes, mm_items)
|
|
self.apply_cache(req[0], req[1])
|
|
logger.info(f"Apply processor cache of mm_hashes: {req[0]}")
|
|
else:
|
|
# get cache request
|
|
resp = self.get_cache(req)
|
|
logger.info(f"Get processor cache of mm_hashes: {req}")
|
|
self.router.send_multipart([client, b"", pickle.dumps(resp)])
|
|
except Exception as e:
|
|
logger.error(f"Error happened while handling processor cache request: {e}")
|