diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index a6c751d12..c5ace81d6 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -801,6 +801,18 @@ class EngineSevice: def check_and_free_block_tables(self): self.resource_manager.check_and_free_block_tables() + def clear_data(self): + try: + llm_logger.info("Clear Data: Start") + self.token_processor.clear_data() + self.engine_worker_queue.clear_data() + self.zmq_server.req_dict.clear() + llm_logger.info("Clear Data: Successfully") + return True + except Exception as e: + llm_logger.error(f"Clear data error: {e}") + return False + def _exit_sub_services(self): """ exit sub services diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index bb9164d24..fc57df0f3 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -512,6 +512,10 @@ class ResourceManagerV1(ResourceManager): def finish_requests_async(self, request_ids: Union[str, Iterable[str]]): return self.finish_execution_pool.submit(self.finish_requests, request_ids) + def clear_data(self): + self.waiting: deque[Request] = deque() + self.to_be_rescheduled_request_id_set = set() + def finish_requests(self, request_ids: Union[str, Iterable[str]]): llm_logger.info(f"recycle resources for requests: {request_ids}") try: diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 3784dd686..2d21ad00f 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -141,6 +141,9 @@ class EngineClient: self.zmq_client = ZmqIpcClient(model, mode) self.zmq_client.connect() + def check_model_weight_status(self): + return self.model_weights_status_signal.value[0] < 0 + async def format_and_add_data(self, prompts: dict): """ Format the request data and send the request to the server. diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 0d096aac2..6df2c148b 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -480,6 +480,7 @@ def reset_scheduler(): if llm_engine is None: return Response("Engine not loaded", status_code=500) + llm_engine.engine.clear_data() llm_engine.engine.scheduler.reset() return Response("Scheduler Reset Successfully", status_code=200) diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 07516bf1e..c9ce5e557 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -210,6 +210,8 @@ class OpenAIServingChat: decoder_base_url=self.tokenizer_base_url, ) while num_choices > 0: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -425,6 +427,8 @@ class OpenAIServingChat: decoder_base_url=self.tokenizer_base_url, ) while True: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index dde3471b7..6388fc9c5 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -216,6 +216,8 @@ class OpenAIServingCompletion: completion_batched_token_ids = [[] for _ in range(num_choices)] current_waiting_time = 0 while num_choices > 0: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -333,6 +335,8 @@ class OpenAIServingCompletion: ) current_waiting_time = 0 while num_choices > 0: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index da88265a2..9b516013d 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -392,6 +392,13 @@ class EngineWorkerQueue: llm_logger.debug("get tasks from queue success") return item + def clear_data(self): + self.lock.acquire() + self.tasks[:] = list() + self.client_read_flag[:] = [1] * self.num_client + self.lock.release() + llm_logger.info("clear data for engine worker queue") + def cleanup(self): """ Exit the worker queue gracefully. diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 7573ce024..0851dee63 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -464,6 +464,31 @@ class TokenProcessor: main_process_metrics.request_inference_time.observe(current_time - task.inference_start_time) main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id]) + def clear_data(self): + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + self.resource_manager.clear_data() + for i in range(self.cfg.max_num_seqs): + if self.resource_manager.stop_flags[i]: + continue + task = self.resource_manager.tasks_list[i] + result = RequestOutput( + request_id=task.request_id, + outputs=CompletionOutput( + index=i, + send_idx=self.tokens_counter[task.request_id], + token_ids=task.eos_token_ids, + draft_token_ids=[], + ), + finished=True, + metrics=RequestMetrics( + arrival_time=time.time(), + request_start_time=task.arrival_time, + ), + ) + is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill" + self._recycle_resources(task.request_id, i, task, result, is_prefill) + llm_logger.warning(f"clear data for task {task.request_id}") + def _record_speculative_decoding_mertics(self, accept_num): """Record metrics of speculative decoding""" if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index 2452f0d35..55f48c4b9 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -228,6 +228,7 @@ class DynamicWeightManager: logger.info("finished loading new checkpoint") elif model_weights_status.value[0] == ModelWeightsStatus.CLEARING: logger.info("infer engine stopped! start to clear checkpoint...") + model_runner.clear_requests() model_runner.clear_parameters(pid) while model_weights_status.value[0] != ModelWeightsStatus.CLEARED: time.sleep(0.01) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index cf4177531..3c8512009 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1704,6 +1704,10 @@ class GPUModelRunner(ModelRunnerBase): self.forward_meta.clear_caches() paddle.device.cuda.empty_cache() + def clear_requests(self): + """Dynamic model loader use to clear requests use for RL""" + self.share_inputs["stop_flags"][:] = True + def clear_parameters(self, pid): """Dynamic model loader use to clear parameters use for RL""" # Clear CUDAGraph