diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index ff3ae3eb4..e512b62d6 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -16,12 +16,12 @@ import inspect import os -import threading import time import traceback import uuid import numpy as np +from filelock import FileLock from fastdeploy import envs from fastdeploy.config import ModelConfig @@ -137,7 +137,7 @@ class EngineClient: pid, max_connections=int(os.getenv("FD_DEALER_CONNECTIONS", 50)) ) self.connection_initialized = False - self.clear_update_lock = threading.Lock() + self.clear_update_lock = FileLock(f"/tmp/fd_weight_clear_update_lock__pid{pid}_port{port}.lock") def create_zmq_client(self, model, mode): """ @@ -337,7 +337,9 @@ class EngineClient: if self.model_weights_status_signal.value[0] == ModelWeightsStatus.NORMAL: return True, "" if self.model_weights_status_signal.value[0] == ModelWeightsStatus.UPDATING: - return False, "updating model weight already" + return False, "worker is updating model weight already" + if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARING: + return False, "worker is clearing model weight, cannot update now" self.model_weights_status_signal.value[0] = ModelWeightsStatus.UPDATING if self.enable_prefix_caching or self.enable_splitwise: @@ -381,7 +383,9 @@ class EngineClient: if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARED: return True, "" if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARING: - return False, "clearing model weight already" + return False, "worker is clearing model weight already" + if self.model_weights_status_signal.value[0] == ModelWeightsStatus.UPDATING: + return False, "worker is updating model weight, cannot clear now" self.model_weights_status_signal.value[0] = ModelWeightsStatus.CLEARING if self.enable_prefix_caching or self.enable_splitwise: diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index 95a274a1e..952b65352 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -217,26 +217,17 @@ class DynamicWeightManager: check model weights status """ logger.info(f"dynamic weight manager is check model weights status! {model_weights_status.value[0]}") - is_stop = 0 while model_weights_status.value[0] != ModelWeightsStatus.NORMAL: if model_weights_status.value[0] == ModelWeightsStatus.UPDATING: logger.info("infer engine stopped! start to load new checkpoint...") model_runner.update_parameters(pid) + while model_weights_status.value[0] != ModelWeightsStatus.NORMAL: + time.sleep(0.01) + logger.info("finished loading new checkpoint") elif model_weights_status.value[0] == ModelWeightsStatus.CLEARING: logger.info("infer engine stopped! start to clear checkpoint...") model_runner.clear_parameters(pid) - elif model_weights_status.value[0] == ModelWeightsStatus.CLEARED: - while True: - if model_weights_status.value[0] == ModelWeightsStatus.NORMAL: - logger.info("finished loading new checkpoint") - break - elif is_stop == 1 or ( - model_weights_status.value[0] == ModelWeightsStatus.CLEARED and is_stop == 0 - ): - if is_stop == 0: - logger.info("finished clearing checkpoint") - is_stop = 1 - time.sleep(0.001) - break - else: - time.sleep(0.001) + while model_weights_status.value[0] != ModelWeightsStatus.CLEARED: + time.sleep(0.01) + logger.info("finished clearing checkpoint") + time.sleep(0.01)