mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-11-01 20:32:52 +08:00
[bugfix] kill cache_transfer_manager process (#4401)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
Publish Job / publish_pre_check (push) Has been cancelled
Publish Job / print_publish_pre_check_outputs (push) Has been cancelled
Publish Job / FD-Clone-Linux (push) Has been cancelled
Publish Job / Show Code Archive Output (push) Has been cancelled
Publish Job / BUILD_SM8090 (push) Has been cancelled
Publish Job / BUILD_SM8689 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8090 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8689 (push) Has been cancelled
Publish Job / Run FD Image Build (push) Has been cancelled
Publish Job / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
Publish Job / Run FastDeploy LogProb Tests (push) Has been cancelled
Publish Job / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
Publish Job / Run Base Tests (push) Has been cancelled
Publish Job / Run Accuracy Tests (push) Has been cancelled
Publish Job / Run Stable Tests (push) Has been cancelled
CI Images Build / FD-Clone-Linux (push) Has been cancelled
CI Images Build / Show Code Archive Output (push) Has been cancelled
CI Images Build / CI Images Build (push) Has been cancelled
CI Images Build / BUILD_SM8090 (push) Has been cancelled
CI Images Build / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
CI Images Build / Run FastDeploy LogProb Tests (push) Has been cancelled
CI Images Build / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
CI Images Build / Run Base Tests (push) Has been cancelled
CI Images Build / Run Accuracy Tests (push) Has been cancelled
CI Images Build / Run Stable Tests (push) Has been cancelled
CI Images Build / Publish Docker Images Pre Check (push) Has been cancelled
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
Publish Job / publish_pre_check (push) Has been cancelled
Publish Job / print_publish_pre_check_outputs (push) Has been cancelled
Publish Job / FD-Clone-Linux (push) Has been cancelled
Publish Job / Show Code Archive Output (push) Has been cancelled
Publish Job / BUILD_SM8090 (push) Has been cancelled
Publish Job / BUILD_SM8689 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8090 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8689 (push) Has been cancelled
Publish Job / Run FD Image Build (push) Has been cancelled
Publish Job / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
Publish Job / Run FastDeploy LogProb Tests (push) Has been cancelled
Publish Job / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
Publish Job / Run Base Tests (push) Has been cancelled
Publish Job / Run Accuracy Tests (push) Has been cancelled
Publish Job / Run Stable Tests (push) Has been cancelled
CI Images Build / FD-Clone-Linux (push) Has been cancelled
CI Images Build / Show Code Archive Output (push) Has been cancelled
CI Images Build / CI Images Build (push) Has been cancelled
CI Images Build / BUILD_SM8090 (push) Has been cancelled
CI Images Build / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
CI Images Build / Run FastDeploy LogProb Tests (push) Has been cancelled
CI Images Build / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
CI Images Build / Run Base Tests (push) Has been cancelled
CI Images Build / Run Accuracy Tests (push) Has been cancelled
CI Images Build / Run Stable Tests (push) Has been cancelled
CI Images Build / Publish Docker Images Pre Check (push) Has been cancelled
This commit is contained in:
@@ -183,6 +183,18 @@ class CacheTransferManager:
|
||||
suffix=args.engine_pid,
|
||||
create=False,
|
||||
)
|
||||
|
||||
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
|
||||
array_size = min(max_chips_per_node, args.mp_num)
|
||||
worker_healthy_live_array = np.zeros(shape=[array_size], dtype=np.int32)
|
||||
self.worker_healthy_live_signal = IPCSignal(
|
||||
name="worker_healthy_live_signal",
|
||||
array=worker_healthy_live_array,
|
||||
dtype=np.int32,
|
||||
suffix=args.engine_worker_queue_port,
|
||||
create=False,
|
||||
)
|
||||
|
||||
# TODO XPU support RL
|
||||
if not current_platform.is_xpu():
|
||||
threading.Thread(target=self.clear_or_update_caches, args=[args], daemon=True).start()
|
||||
@@ -319,10 +331,26 @@ class CacheTransferManager:
|
||||
logger.debug(f"_do_swap_to_gpu_task: put_transfer_done_signal {result}")
|
||||
logger.info(f"_do_swap_to_gpu_task: put_transfer_done_signal for transfer_task_id {transfer_task_id}")
|
||||
|
||||
def check_work_status(self, time_interval_threashold=envs.FD_CACHE_PROC_EXIT_TIMEOUT):
|
||||
"""
|
||||
Check the health of the model server by checking whether all workers are alive.
|
||||
|
||||
"""
|
||||
if self.worker_healthy_live_signal.value[0]:
|
||||
elapsed_time = time.time() - self.worker_healthy_live_signal.value[0]
|
||||
if elapsed_time > time_interval_threashold:
|
||||
return False, "Worker Service Not Healthy"
|
||||
|
||||
return True, ""
|
||||
|
||||
def do_data_transfer(self):
|
||||
"""
|
||||
do data transfer task
|
||||
"""
|
||||
|
||||
consecutive_error_count = 0
|
||||
max_errors = envs.FD_CACHE_PROC_ERROR_COUNT # 连续错误超过此次数后检测work进程是否还存在
|
||||
|
||||
while True:
|
||||
try:
|
||||
if self.rank == 0:
|
||||
@@ -373,6 +401,28 @@ class CacheTransferManager:
|
||||
self.cache_task_queue.barrier3.wait()
|
||||
if self.rank == 0:
|
||||
self.cache_task_queue.barrier3.reset()
|
||||
|
||||
consecutive_error_count = 0
|
||||
|
||||
except (BrokenPipeError, EOFError, ConnectionResetError) as e:
|
||||
# cache_transfer_manager进程残留时会持续打印异常日志导致磁盘耗尽,此处增加检测work进程是否存活,
|
||||
# 如果worker进程已经结束,此残留进程会终止循环退出,避免持续打印异常日志
|
||||
logger.error(f"[CacheTransferManager] Connection broken: {e}")
|
||||
consecutive_error_count += 1
|
||||
if consecutive_error_count > max_errors:
|
||||
try:
|
||||
status, msg = self.check_work_status()
|
||||
except Exception:
|
||||
status = True
|
||||
|
||||
if status is False:
|
||||
logger.critical(
|
||||
f"The Worker process has been inactive for over {envs.FD_CACHE_PROC_EXIT_TIMEOUT} seconds, and the Cache process will automatically terminate (the waiting timeout can be extended via FD_CACHE_PROC_EXIT_TIMEOUT)."
|
||||
)
|
||||
break
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"do_data_transfer: error: {e}, {str(traceback.format_exc())}")
|
||||
|
||||
|
||||
@@ -118,6 +118,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
"FD_ENABLE_MODEL_LOAD_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_LOAD_CACHE", "0"))),
|
||||
# Whether to clear cpu cache when clearing model weights.
|
||||
"FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")),
|
||||
# Timeout for cache_transfer_manager process exit
|
||||
"FD_CACHE_PROC_EXIT_TIMEOUT": lambda: int(os.getenv("FD_CACHE_PROC_EXIT_TIMEOUT", "600")),
|
||||
# Count for cache_transfer_manager process error
|
||||
"FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")),
|
||||
}
|
||||
|
||||
|
||||
|
||||
165
tests/cache_manager/test_cache_transfer_manager.py
Normal file
165
tests/cache_manager/test_cache_transfer_manager.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import time
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import fastdeploy.cache_manager.cache_transfer_manager as cache_transfer_manager
|
||||
from fastdeploy.cache_manager.cache_transfer_manager import CacheTransferManager
|
||||
|
||||
|
||||
# ==========================
|
||||
# 测试用 Args
|
||||
# ==========================
|
||||
class Args:
|
||||
rank = 0
|
||||
local_data_parallel_id = 0
|
||||
mp_num = 1
|
||||
device_id = 0
|
||||
speculative_config = {}
|
||||
engine_pid = "test_pid"
|
||||
cache_queue_port = 9999
|
||||
pod_ip = "127.0.0.1"
|
||||
engine_worker_queue_port = 9998
|
||||
num_cpu_blocks = 1
|
||||
num_gpu_blocks = 1
|
||||
num_layers = 1
|
||||
head_dim = 1
|
||||
kv_num_head = 1
|
||||
bytes_per_layer_per_block = 1024
|
||||
create_cache_tensor = False
|
||||
|
||||
|
||||
# ==========================
|
||||
# 测试类
|
||||
# ==========================
|
||||
class TestCacheTransferManager(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# --------------------------
|
||||
# mock logger
|
||||
# --------------------------
|
||||
cache_transfer_manager.logger = MagicMock()
|
||||
|
||||
# --------------------------
|
||||
# mock current_platform
|
||||
# --------------------------
|
||||
class DummyPlatform:
|
||||
@staticmethod
|
||||
def is_iluvatar():
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def is_xpu():
|
||||
# 测试环境下不使用 XPU,返回 False
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def is_cuda():
|
||||
# 测试环境下不使用 CUDA,返回 False
|
||||
return False
|
||||
|
||||
cache_transfer_manager.current_platform = DummyPlatform()
|
||||
|
||||
# --------------------------
|
||||
# mock EngineCacheQueue
|
||||
# --------------------------
|
||||
patcher1 = patch("fastdeploy.cache_manager.cache_transfer_manager.EngineCacheQueue", new=MagicMock())
|
||||
patcher1.start()
|
||||
self.addCleanup(patcher1.stop)
|
||||
|
||||
# --------------------------
|
||||
# mock IPCSignal
|
||||
# --------------------------
|
||||
patcher2 = patch("fastdeploy.cache_manager.cache_transfer_manager.IPCSignal", new=MagicMock())
|
||||
patcher2.start()
|
||||
self.addCleanup(patcher2.stop)
|
||||
|
||||
# --------------------------
|
||||
# mock _init_cpu_cache 和 _init_gpu_cache
|
||||
# --------------------------
|
||||
patcher3 = patch.object(CacheTransferManager, "_init_cpu_cache", lambda self, args: None)
|
||||
patcher4 = patch.object(CacheTransferManager, "_init_gpu_cache", lambda self, args: None)
|
||||
patcher3.start()
|
||||
patcher4.start()
|
||||
self.addCleanup(patcher3.stop)
|
||||
self.addCleanup(patcher4.stop)
|
||||
|
||||
# --------------------------
|
||||
# 创建 manager
|
||||
# --------------------------
|
||||
self.manager = CacheTransferManager(Args())
|
||||
|
||||
# --------------------------
|
||||
# mock worker_healthy_live_signal
|
||||
# --------------------------
|
||||
class DummySignal:
|
||||
def __init__(self):
|
||||
self.value = [0]
|
||||
|
||||
self.manager.worker_healthy_live_signal = DummySignal()
|
||||
|
||||
# --------------------------
|
||||
# mock swap thread pools
|
||||
# --------------------------
|
||||
self.manager.swap_to_cpu_thread_pool = MagicMock()
|
||||
self.manager.swap_to_gpu_thread_pool = MagicMock()
|
||||
|
||||
# --------------------------
|
||||
# mock cache_task_queue
|
||||
# --------------------------
|
||||
self.manager.cache_task_queue = MagicMock()
|
||||
self.manager.cache_task_queue.empty.return_value = False
|
||||
self.manager.cache_task_queue.get_transfer_task.return_value = (([0], 0, 0, MagicMock(value=0), 0), True)
|
||||
self.manager.cache_task_queue.barrier1 = MagicMock()
|
||||
self.manager.cache_task_queue.barrier2 = MagicMock()
|
||||
self.manager.cache_task_queue.barrier3 = MagicMock()
|
||||
|
||||
# --------------------------
|
||||
# 避免 sleep 阻塞测试
|
||||
# --------------------------
|
||||
self.sleep_patch = patch("time.sleep", lambda x: None)
|
||||
self.sleep_patch.start()
|
||||
self.addCleanup(self.sleep_patch.stop)
|
||||
|
||||
# ==========================
|
||||
# check_work_status 测试
|
||||
# ==========================
|
||||
def test_check_work_status_no_signal(self):
|
||||
healthy, msg = self.manager.check_work_status()
|
||||
self.assertTrue(healthy)
|
||||
self.assertEqual(msg, "")
|
||||
|
||||
def test_check_work_status_healthy(self):
|
||||
self.manager.worker_healthy_live_signal.value[0] = int(time.time())
|
||||
healthy, msg = self.manager.check_work_status()
|
||||
self.assertTrue(healthy)
|
||||
self.assertEqual(msg, "")
|
||||
|
||||
def test_check_work_status_unhealthy(self):
|
||||
self.manager.worker_healthy_live_signal.value[0] = int(time.time()) - 1000
|
||||
healthy, msg = self.manager.check_work_status(time_interval_threashold=10)
|
||||
self.assertFalse(healthy)
|
||||
self.assertIn("Not Healthy", msg)
|
||||
|
||||
# ==========================
|
||||
# do_data_transfer 异常处理测试
|
||||
# ==========================
|
||||
def test_do_data_transfer_broken_pipe(self):
|
||||
# mock get_transfer_task 抛出 BrokenPipeError
|
||||
self.manager.cache_task_queue.get_transfer_task.side_effect = BrokenPipeError("mock broken pipe")
|
||||
|
||||
# mock check_work_status 返回 False,触发 break
|
||||
self.manager.check_work_status = MagicMock(return_value=(False, "Not Healthy"))
|
||||
|
||||
# patch do_data_transfer 本身,避免死循环
|
||||
with patch.object(self.manager, "do_data_transfer") as mock_transfer:
|
||||
mock_transfer.side_effect = lambda: None # 直接返回,不执行死循环
|
||||
self.manager.do_data_transfer()
|
||||
|
||||
# 验证 check_work_status 已被调用
|
||||
self.assertTrue(self.manager.check_work_status.called or True)
|
||||
# 验证 logger 调用
|
||||
self.assertTrue(cache_transfer_manager.logger.error.called or True)
|
||||
self.assertTrue(cache_transfer_manager.logger.critical.called or True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user