diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index c5ace81d6..3b77737d9 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -806,7 +806,8 @@ class EngineSevice: llm_logger.info("Clear Data: Start") self.token_processor.clear_data() self.engine_worker_queue.clear_data() - self.zmq_server.req_dict.clear() + self.send_response_server.req_dict.clear() + self.recv_request_server.req_dict.clear() llm_logger.info("Clear Data: Successfully") return True except Exception as e: diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 6df2c148b..ba3231abc 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -499,6 +499,7 @@ def control_scheduler(request: ControlSchedulerRequest): return JSONResponse(content=content.model_dump(), status_code=500) if request.reset: + llm_engine.engine.clear_data() llm_engine.engine.scheduler.reset() if request.load_shards_num or request.reallocate_shard: diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index 6af0bfecc..8b0f92085 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -223,6 +223,7 @@ class DynamicWeightManager: while model_weights_status.value[0] != ModelWeightsStatus.NORMAL: if model_weights_status.value[0] == ModelWeightsStatus.UPDATING: logger.info("infer engine stopped! start to load new checkpoint...") + model_runner.clear_requests() model_runner.update_parameters(pid) while model_weights_status.value[0] != ModelWeightsStatus.NORMAL: time.sleep(0.01) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 856a58ef9..2b2ed9a59 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -337,6 +337,8 @@ class PaddleDisWorkerProc: self.worker.model_runner, self.parallel_config.engine_worker_queue_port, ) + logger.info(f"current task queue data: {self.task_queue.num_tasks()}") + self.task_queue.clear_data() self.model_weights_signal[0] = ModelWeightsStatus.NORMAL logger.info(f"Rank: {self.local_rank} has updated or cleared parameters.")