From 87c7c0d85274bcda3471e6e09f3adbb3a6087efa Mon Sep 17 00:00:00 2001 From: chenjian <1435317881@qq.com> Date: Fri, 14 Nov 2025 12:07:02 +0800 Subject: [PATCH] fix local scheduler bug (#5020) --- fastdeploy/scheduler/dp_scheduler.py | 2 +- fastdeploy/scheduler/local_scheduler.py | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/fastdeploy/scheduler/dp_scheduler.py b/fastdeploy/scheduler/dp_scheduler.py index f67b684ab..05ee40487 100644 --- a/fastdeploy/scheduler/dp_scheduler.py +++ b/fastdeploy/scheduler/dp_scheduler.py @@ -55,13 +55,13 @@ class DPLocalScheduler(LocalScheduler): results: List of RequestOutput objects containing results """ responses: List[ScheduledResponse] = [ScheduledResponse(result) for result in results] - self.batch_responses_per_step.append([response.raw for response in responses]) finished_responses = [response.request_id for response in responses if response.finished] if len(finished_responses) > 0: self.scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}") with self.mutex: + self.batch_responses_per_step.append([response.raw for response in responses]) for response in responses: if response.request_id not in self.responses: self.responses[response.request_id] = [response] diff --git a/fastdeploy/scheduler/local_scheduler.py b/fastdeploy/scheduler/local_scheduler.py index b246ca09c..88d918ab8 100644 --- a/fastdeploy/scheduler/local_scheduler.py +++ b/fastdeploy/scheduler/local_scheduler.py @@ -79,6 +79,7 @@ class LocalScheduler: self.requests: Dict[str, ScheduledRequest] = dict() self.responses: Dict[str, List[ScheduledResponse]] = dict() + self.batch_responses_per_step: List[List[ScheduledResponse]] = list() self.wait_request_timeout = 10 self.wait_response_timeout = 0.001 @@ -299,6 +300,7 @@ class LocalScheduler: scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}") with self.mutex: + self.batch_responses_per_step.append([response.raw for response in responses]) for response in responses: if response.request_id not in self.requests: scheduler_logger.warning(f"Scheduler has received a expired response: {[response.request_id]}") @@ -337,11 +339,15 @@ class LocalScheduler: def _get_results(): responses = self.responses + batch_responses_per_step = self.batch_responses_per_step self.responses = dict() - return responses + self.batch_responses_per_step = list() + return responses, batch_responses_per_step with self.responses_not_empty: - responses = self.responses_not_empty.wait_for(_get_results, self.wait_response_timeout) + responses, batch_responses_per_step = self.responses_not_empty.wait_for( + _get_results, self.wait_response_timeout + ) results = dict() for request_id, resps in responses.items(): @@ -354,4 +360,7 @@ class LocalScheduler: if finished: self._recycle(request_id) scheduler_logger.info(f"Scheduler has pulled a finished response: {[request_id]}") - return results + if envs.FD_ENABLE_INTERNAL_ADAPTER: + return batch_responses_per_step + else: + return results