From f75697c2d13f19b54fbfc9e1551260018152c5e5 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Sun, 21 Sep 2025 20:41:27 +0800 Subject: [PATCH] [Feature] support clear data (#4185) * fix * fix * fix * [Feature] support clear data * update * fix * fix * fix * fix --- fastdeploy/engine/common_engine.py | 12 +++++++++ .../engine/sched/resource_manager_v1.py | 4 +++ fastdeploy/entrypoints/engine_client.py | 3 +++ fastdeploy/entrypoints/openai/api_server.py | 1 + fastdeploy/entrypoints/openai/serving_chat.py | 5 ++++ .../entrypoints/openai/serving_completion.py | 5 +++- .../inter_communicator/engine_worker_queue.py | 7 ++++++ fastdeploy/output/token_processor.py | 25 +++++++++++++++++++ fastdeploy/rl/dynamic_weight_manager.py | 1 + fastdeploy/worker/gcu_model_runner.py | 4 +++ fastdeploy/worker/gpu_model_runner.py | 4 +++ 11 files changed, 70 insertions(+), 1 deletion(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index bce3a4d48..4b1fb97eb 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -751,6 +751,18 @@ class EngineSevice: def check_and_free_block_tables(self): self.resource_manager.check_and_free_block_tables() + def clear_data(self): + try: + llm_logger.info("Clear Data: Start") + self.token_processor.clear_data() + self.engine_worker_queue.clear_data() + self.zmq_server.req_dict.clear() + llm_logger.info("Clear Data: Successfully") + return True + except Exception as e: + llm_logger.error(f"Clear data error: {e}") + return False + def _exit_sub_services(self): """ exit sub services diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index ed6e5fed1..57bb61d90 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -548,3 +548,7 @@ class ResourceManagerV1(ResourceManager): del self.requests[req_id] except Exception as e: llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}") + + def clear_data(self): + self.waiting: deque[Request] = deque() + self.to_be_rescheduled_request_id_set = set() diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index fa23aaaee..a7bd95235 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -359,3 +359,6 @@ class EngineClient: return False, "clear model weight timeout" time.sleep(1) return True, "" + + def check_model_weight_status(self): + return self.model_weights_status_signal.value[0] < 0 diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 168f63484..e135202ad 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -478,6 +478,7 @@ def reset_scheduler(): if llm_engine is None: return Response("Engine not loaded", status_code=500) + llm_engine.engine.clear_data() llm_engine.engine.scheduler.reset() return Response("Scheduler Reset Successfully", status_code=200) diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index d8356ce52..125d785fe 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -210,6 +210,8 @@ class OpenAIServingChat: decoder_base_url=self.tokenizer_base_url, ) while num_choices > 0: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -425,6 +427,8 @@ class OpenAIServingChat: decoder_base_url=self.tokenizer_base_url, ) while True: + if self.engine_client.check_model_weight_status(): + return ErrorResponse(code=400, message="Model weight cleared") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -513,6 +517,7 @@ class OpenAIServingChat: if final_res.get("error_msg") is not None and "Recover" in final_res["error_msg"]: choice.finish_reason = "recover_stop" + choices.append(choice) num_prompt_tokens = len(prompt_token_ids) diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index ba81afc35..9b089d073 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -216,6 +216,8 @@ class OpenAIServingCompletion: completion_batched_token_ids = [[] for _ in range(num_choices)] current_waiting_time = 0 while num_choices > 0: + if self.engine_client.check_model_weight_status(): + return ErrorResponse(message="Model weight cleared", code=400) try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -270,7 +272,6 @@ class OpenAIServingCompletion: return res except Exception as e: api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True) - raise finally: self.engine_client.semaphore.release() if dealer is not None: @@ -333,6 +334,8 @@ class OpenAIServingCompletion: ) current_waiting_time = 0 while num_choices > 0: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index da88265a2..9b516013d 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -392,6 +392,13 @@ class EngineWorkerQueue: llm_logger.debug("get tasks from queue success") return item + def clear_data(self): + self.lock.acquire() + self.tasks[:] = list() + self.client_read_flag[:] = [1] * self.num_client + self.lock.release() + llm_logger.info("clear data for engine worker queue") + def cleanup(self): """ Exit the worker queue gracefully. diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 89adbe66c..12eb0c687 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -516,6 +516,31 @@ class TokenProcessor: single_head_acceptance_rate ) + def clear_data(self): + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + self.resource_manager.clear_data() + for i in range(self.cfg.max_num_seqs): + if self.resource_manager.stop_flags[i]: + continue + task = self.resource_manager.tasks_list[i] + result = RequestOutput( + request_id=task.request_id, + outputs=CompletionOutput( + index=i, + send_idx=self.tokens_counter[task.request_id], + token_ids=task.eos_token_ids, + draft_token_ids=[], + ), + finished=True, + metrics=RequestMetrics( + arrival_time=time.time(), + request_start_time=task.arrival_time, + ), + ) + is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill" + self._recycle_resources(task.request_id, i, task, result, is_prefill) + llm_logger.warning(f"clear data for task {task.request_id}") + class WarmUpTokenProcessor(TokenProcessor): """ diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index c7ccd26ac..7d8a10521 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -256,6 +256,7 @@ class DynamicWeightManager: model_runner.update_parameters(pid) elif model_weights_status.value[0] == -1: logger.info("infer engine stopped! start to clear checkpoint...") + model_runner.clear_requests() model_runner.clear_parameters(pid) while True: diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index 07341c23b..cafae7e42 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -1199,6 +1199,10 @@ class GCUModelRunner(ModelRunnerBase): paddle.device.cuda.empty_cache() self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") + def clear_requests(self): + """Dynamic model loader use to clear requests use for RL""" + self.share_inputs["stop_flags"][:] = True + def update_parameters(self, pid): """ " Dynamic model loader use to update parameters use for RL""" self.dynamic_weight_manager.update_parameters(pid) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 0290591b8..e34f23b16 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1729,6 +1729,10 @@ class GPUModelRunner(ModelRunnerBase): self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") + def clear_requests(self): + """Dynamic model loader use to clear requests use for RL""" + self.share_inputs["stop_flags"][:] = True + def update_parameters(self, pid): """Dynamic model loader use to update parameters use for RL""" # Update parameters