[fix] fix clear/update lock not working when workers > 1

This commit is contained in:
liyonghua0910
2025-09-17 18:53:58 +08:00
parent cfa0982aae
commit 33f209086b
2 changed files with 15 additions and 20 deletions

View File

@@ -16,12 +16,12 @@
import inspect
import os
import threading
import time
import traceback
import uuid
import numpy as np
from filelock import FileLock
from fastdeploy import envs
from fastdeploy.config import ModelConfig
@@ -137,7 +137,7 @@ class EngineClient:
pid, max_connections=int(os.getenv("FD_DEALER_CONNECTIONS", 50))
)
self.connection_initialized = False
self.clear_update_lock = threading.Lock()
self.clear_update_lock = FileLock(f"/tmp/fd_weight_clear_update_lock__pid{pid}_port{port}.lock")
def create_zmq_client(self, model, mode):
"""
@@ -337,7 +337,9 @@ class EngineClient:
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.NORMAL:
return True, ""
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.UPDATING:
return False, "updating model weight already"
return False, "worker is updating model weight already"
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARING:
return False, "worker is clearing model weight, cannot update now"
self.model_weights_status_signal.value[0] = ModelWeightsStatus.UPDATING
if self.enable_prefix_caching or self.enable_splitwise:
@@ -381,7 +383,9 @@ class EngineClient:
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARED:
return True, ""
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARING:
return False, "clearing model weight already"
return False, "worker is clearing model weight already"
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.UPDATING:
return False, "worker is updating model weight, cannot clear now"
self.model_weights_status_signal.value[0] = ModelWeightsStatus.CLEARING
if self.enable_prefix_caching or self.enable_splitwise:

View File

@@ -217,26 +217,17 @@ class DynamicWeightManager:
check model weights status
"""
logger.info(f"dynamic weight manager is check model weights status! {model_weights_status.value[0]}")
is_stop = 0
while model_weights_status.value[0] != ModelWeightsStatus.NORMAL:
if model_weights_status.value[0] == ModelWeightsStatus.UPDATING:
logger.info("infer engine stopped! start to load new checkpoint...")
model_runner.update_parameters(pid)
while model_weights_status.value[0] != ModelWeightsStatus.NORMAL:
time.sleep(0.01)
logger.info("finished loading new checkpoint")
elif model_weights_status.value[0] == ModelWeightsStatus.CLEARING:
logger.info("infer engine stopped! start to clear checkpoint...")
model_runner.clear_parameters(pid)
elif model_weights_status.value[0] == ModelWeightsStatus.CLEARED:
while True:
if model_weights_status.value[0] == ModelWeightsStatus.NORMAL:
logger.info("finished loading new checkpoint")
break
elif is_stop == 1 or (
model_weights_status.value[0] == ModelWeightsStatus.CLEARED and is_stop == 0
):
if is_stop == 0:
logger.info("finished clearing checkpoint")
is_stop = 1
time.sleep(0.001)
break
else:
time.sleep(0.001)
while model_weights_status.value[0] != ModelWeightsStatus.CLEARED:
time.sleep(0.01)
logger.info("finished clearing checkpoint")
time.sleep(0.01)