diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 06ff8fe1b..5eff092df 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -31,7 +31,12 @@ from fastdeploy.engine.pooling_params import PoolingParams from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.openai.protocol import ToolCall from fastdeploy.utils import data_processor_logger -from fastdeploy.worker.output import LogprobsLists, PromptLogprobs, SampleLogprobs +from fastdeploy.worker.output import ( + LogprobsLists, + PromptLogprobs, + SampleLogprobs, + SpeculateMetrics, +) class RequestStatus(Enum): @@ -402,6 +407,7 @@ class CompletionOutput: text: Optional[str] = None reasoning_content: Optional[str] = None tool_calls: Optional[ToolCall] = None + speculate_metrics: Optional[SpeculateMetrics] = None def to_dict(self): """ @@ -475,6 +481,7 @@ class RequestMetrics: llm_engine_recv_req_timestamp: Optional[float] = None llm_engine_send_req_to_engine_timestamp: Optional[float] = None llm_engine_recv_token_timestamp: Optional[float] = None + speculate_metrics: Optional[SpeculateMetrics] = None def to_dict(self): """ @@ -594,6 +601,8 @@ class RequestOutput: self.outputs.draft_top_logprobs.sampled_token_ranks.extend( next_output.outputs.draft_top_logprobs.sampled_token_ranks ) + if next_output.metrics.speculate_metrics is not None: + self.outputs.speculate_metrics = next_output.metrics.speculate_metrics def __repr__(self) -> str: return ( diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 33d12a123..02b78773f 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -31,7 +31,7 @@ from pydantic import ( ) from fastdeploy.engine.pooling_params import PoolingParams -from fastdeploy.worker.output import PromptLogprobs +from fastdeploy.worker.output import PromptLogprobs, SpeculateMetrics class InvalidParameterException(Exception): @@ -230,6 +230,7 @@ class ChatCompletionResponseChoice(BaseModel): draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] + speculate_metrics: Optional[SpeculateMetrics] = None class ChatCompletionResponse(BaseModel): @@ -295,6 +296,7 @@ class ChatCompletionResponseStreamChoice(BaseModel): prompt_logprobs: Optional[PromptLogprobs] = None finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None arrival_time: Optional[float] = None + speculate_metrics: Optional[SpeculateMetrics] = None class ChatCompletionStreamResponse(BaseModel): @@ -329,6 +331,7 @@ class CompletionResponseChoice(BaseModel): reasoning_content: Optional[str] = None finish_reason: Optional[Literal["stop", "length", "tool_calls"]] tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None + speculate_metrics: Optional[SpeculateMetrics] = None class CompletionResponse(BaseModel): @@ -374,6 +377,7 @@ class CompletionResponseStreamChoice(BaseModel): reasoning_content: Optional[str] = None finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None + speculate_metrics: Optional[SpeculateMetrics] = None class CompletionStreamResponse(BaseModel): diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 0f1cfb000..b9daa74fb 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -57,6 +57,7 @@ from fastdeploy.worker.output import ( LogprobsLists, LogprobsTensors, PromptLogprobs, + SpeculateMetrics, ) NONES = itertools.repeat(None) @@ -387,6 +388,8 @@ class OpenAIServingChat: request.include_logprobs_decode_token, ) + output_speculate_metrics = res["metrics"].get("speculate_metrics", None) + delta_message = DeltaMessage( reasoning_content="", prompt_token_ids=None, @@ -418,6 +421,7 @@ class OpenAIServingChat: logprobs=logprobs_res, draft_logprobs=draft_logprobs_res, arrival_time=arrival_time, + speculate_metrics=output_speculate_metrics, ) if res["finished"]: num_choices -= 1 @@ -536,6 +540,7 @@ class OpenAIServingChat: decoder_base_url=self.tokenizer_base_url, ) prompt_logprobs_res_list = [[] for _ in range(num_choices)] + speculate_metrics = [None for _ in range(num_choices)] choices = [] while num_choices > 0: if self.engine_client.check_model_weight_status(): @@ -613,6 +618,7 @@ class OpenAIServingChat: ) if prompt_logprobs_res: prompt_logprobs_res_list[idx].extend(clamp_prompt_logprobs(prompt_logprobs_res)) + speculate_metrics[idx] = data["metrics"].get("speculate_metrics", None) if data["finished"]: num_choices -= 1 reasoning_num_tokens[idx] = data["outputs"].get("reasoning_token_num", 0) @@ -635,6 +641,7 @@ class OpenAIServingChat: response_processor=response_processor, prompt_logprobs_res_list=prompt_logprobs_res_list, max_tokens=max_tokens, + speculate_metrics=speculate_metrics[idx], ) choices.append(choice) finally: @@ -688,6 +695,7 @@ class OpenAIServingChat: prompt_logprobs_res_list: list, response_processor: ChatResponseProcessor, max_tokens: int, + speculate_metrics: SpeculateMetrics | None, ) -> ChatCompletionResponseChoice: idx = int(data["request_id"].split("_")[-1]) output = data["outputs"] @@ -745,6 +753,7 @@ class OpenAIServingChat: draft_logprobs=draft_logprobs_full_res, prompt_logprobs=prompt_logprobs_full_res, finish_reason=finish_reason, + speculate_metrics=speculate_metrics, ) def _create_chat_logprobs( diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index ac9d390cf..fd4b95995 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -261,6 +261,7 @@ class OpenAIServingCompletion: aggregated_token_ids = [[] for _ in range(num_choices)] aggregated_prompt_logprobs_tensors = [None] * num_choices completion_batched_token_ids = [[] for _ in range(num_choices)] + aggregated_speculate_metrics = [None] * num_choices current_waiting_time = 0 while num_choices > 0: if self.engine_client.check_model_weight_status(): @@ -315,12 +316,18 @@ class OpenAIServingCompletion: ) output_tokens[rid] += len(data["outputs"]["token_ids"]) completion_batched_token_ids[rid].extend(data["outputs"]["token_ids"]) + + output_speculate_metrics = data["metrics"].get("speculate_metrics", None) + if output_speculate_metrics is not None: + aggregated_speculate_metrics[rid] = output_speculate_metrics + if data.get("finished", False): data["output_token_ids"] = output_tokens[rid] data["outputs"]["top_logprobs"] = aggregated_top_logprobs[rid] data["outputs"]["draft_top_logprobs"] = aggregated_draft_top_logprobs[rid] data["outputs"]["token_ids"] = aggregated_token_ids[rid] data["prompt_logprobs_tensors"] = aggregated_prompt_logprobs_tensors[rid] + data["speculate_metrics"] = aggregated_speculate_metrics[rid] valid_results[rid] = data num_choices -= 1 break @@ -512,6 +519,7 @@ class OpenAIServingCompletion: output_tokens[idx] += output.get("num_image_tokens") num_image_tokens[idx] += output.get("num_image_tokens") reasoning_tokens[idx] += output.get("reasoning_token_num", 0) + output_speculate_metrics = res["metrics"].get("speculate_metrics", None) delta_message = CompletionResponseStreamChoice( index=idx, text=output["text"], @@ -524,6 +532,7 @@ class OpenAIServingCompletion: logprobs=logprobs_res, prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res), draft_logprobs=draft_logprobs_res, + speculate_metrics=output_speculate_metrics, ) if not res["finished"] and "delta_message" in output: delta_message_output = output["delta_message"] @@ -683,6 +692,7 @@ class OpenAIServingCompletion: draft_logprobs=aggregated_draft_logprobs, prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res), finish_reason=finish_reason, + speculate_metrics=final_res["metrics"].get("speculate_metrics", None), ) choices.append(choice_data) diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index ec89e8383..4da49a966 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -143,9 +143,9 @@ class MetricsManager: request_success_total: "Counter" spec_decode_draft_acceptance_rate: "Gauge" spec_decode_efficiency: "Gauge" - spec_decode_num_accepted_tokens_total: "Counter" + spec_decode_num_accepted_tokens_total: "Gauge" spec_decode_num_draft_tokens_total: "Counter" - spec_decode_num_emitted_tokens_total: "Counter" + spec_decode_num_emitted_tokens_total: "Gauge" spec_decode_draft_single_head_acceptance_rate: "list[Gauge]" # for YIYAN Adapter @@ -598,13 +598,13 @@ class MetricsManager: "kwargs": {}, }, "spec_decode_num_accepted_tokens_total": { - "type": Counter, + "type": Gauge, "name": "fastdeploy:spec_decode_num_accepted_tokens_total", "description": "Total number of tokens accepted by the scoring model and verification program", "kwargs": {}, }, "spec_decode_num_emitted_tokens_total": { - "type": Counter, + "type": Gauge, "name": "fastdeploy:spec_decode_num_emitted_tokens_total", "description": "Total number of tokens output by the entire system", "kwargs": {}, diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 406252fd4..109df4d2c 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -35,6 +35,7 @@ from fastdeploy.engine.request import ( Request, RequestMetrics, RequestOutput, + SpeculateMetrics, ) from fastdeploy.inter_communicator import ZmqIpcServer from fastdeploy.metrics.metrics import main_process_metrics @@ -112,16 +113,13 @@ class TokenProcessor: self.num_accepted_tokens = 0 self.num_emitted_tokens = 0 self.max_num_emitted_tokens = 0 - self.num_rest_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS - self.num_accept_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS self.executor = ThreadPoolExecutor(max_workers=1) self.prefill_result_status = dict() self._finalizer = weakref.finalize(self, self._cleanup_resources) self._batch_result_buffer = None + self.total_step_per_request = {} + self.accept_token_num_per_head_per_request = {} + self.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS def _cleanup_resources(self): """Cleaning up shared memory resources""" @@ -506,7 +504,7 @@ class TokenProcessor: if task_id in self.tokens_counter: del self.tokens_counter[task_id] - def _compute_speculative_status(self): + def _compute_speculative_status(self, result: RequestOutput): # TODO(liuzichang): Supplement more statistics interval = 1 if self.speculative_stats_step % interval == 0: @@ -519,13 +517,11 @@ class TokenProcessor: if self.cfg.speculative_config.method in ["mtp"]: single_head_acceptance_rates = [] - for head in range(self.cfg.speculative_config.num_speculative_tokens): - if self.num_rest_requests_per_head[head] != 0: + for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1): + if self.accept_token_num_per_head[i - 1] != 0: single_head_acceptance_rates.append( - self.num_accept_requests_per_head[head] / self.num_rest_requests_per_head[head] + self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1] ) - else: - single_head_acceptance_rates.append(0) spec_logger.info(f" Single head accept ratio: {single_head_acceptance_rates}") if self.number_of_output_tokens > 1000000: @@ -533,6 +529,43 @@ class TokenProcessor: self.total_step = 0 self.speculative_stats_step += 1 + # For result + req_id = result.request_id + accept_num_list = self.accept_token_num_per_head_per_request[req_id] + req_total_step = self.total_step_per_request[req_id] + req_total_draft_tokens = req_total_step * (self.cfg.speculative_config.num_speculative_tokens + 1) + req_accepted_tokens = sum(accept_num_list) + req_rejected_tokens = req_total_draft_tokens - req_accepted_tokens + req_accept_ratio = 1 - req_total_step / req_accepted_tokens + req_avg_accept_length = req_accepted_tokens / req_total_step + + accept_ratio_per_head = [] + for i in range(1, len(accept_num_list)): + if accept_num_list[i - 1] != 0: + accept_ratio_per_head.append(accept_num_list[i] / accept_num_list[i - 1]) + else: + accept_ratio_per_head.append(0) + + result.metrics.speculate_metrics = SpeculateMetrics( + accepted_tokens=req_accepted_tokens, + rejected_tokens=req_rejected_tokens, + accept_ratio=req_accept_ratio, + average_accept_length=req_avg_accept_length, + accept_ratio_per_head=accept_ratio_per_head[: self.cfg.speculative_config.num_speculative_tokens], + ) + + # Log + spec_logger.debug( + f"req_id: {result.request_id}, total_step: {req_total_step}, " + f"accept_ratio: {accept_ratio}, average_accept_lenght: {req_avg_accept_length}," + f"accepted_tokens: {req_accepted_tokens}, rejected_tokens: {req_rejected_tokens}" + f"accept_ratio_per_head: {accept_ratio_per_head}" + ) + + # Clear request record + self.accept_token_num_per_head_per_request.pop(req_id) + self.total_step_per_request.pop(req_id) + def _process_batch_draft_tokens(self, mtype, batch, accept_num, tokens, scores, ranks): """ Process batch draft tokens and generate corresponding request outputs @@ -620,7 +653,7 @@ class TokenProcessor: else: batch = self.output_tokens[1] accept_num = tokens[2 : batch + 2] - self._record_speculative_decoding_mertics(accept_num) + elif self.use_logprobs: batch = self.output_tokens[1, 0] tokens = tokens[2 : batch * (K + 1) + 2].reshape([batch, K + 1])[:, : (K + 1)] @@ -642,6 +675,7 @@ class TokenProcessor: task_id = task.request_id if self.cfg.speculative_config.method: + self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i]) if accept_num[i] == -3: recovery_stop = True if recovery_stop: @@ -792,7 +826,7 @@ class TokenProcessor: ) llm_logger.info(f"{self.resource_manager.info()}") if self.cfg.speculative_config.method: - self._compute_speculative_status() + self._compute_speculative_status(result) if not is_prefill: self._record_completion_metrics(task, current_time) self._recycle_resources(task_id, i, task, result, is_prefill) @@ -801,6 +835,8 @@ class TokenProcessor: llm_logger.debug(f"get response from infer: {result}") batch_result.append(result) + if self.cfg.speculative_config.method: + self._record_speculative_decoding_metrics(accept_num) self.postprocess(batch_result, mtype) def _record_metrics(self, task, current_time, token_ids): @@ -834,7 +870,7 @@ class TokenProcessor: main_process_metrics.request_inference_time.observe(current_time - task.inference_start_time) main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id]) - def _record_speculative_decoding_mertics(self, accept_num): + def _record_speculative_decoding_metrics(self, accept_num): """Record metrics of speculative decoding""" if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): main_process_metrics._init_speculative_metrics( @@ -843,15 +879,13 @@ class TokenProcessor: ) real_accept_num = [x for x in accept_num if x > 0] - num_accepted_tokens = sum([x - 1 for x in real_accept_num]) - self.num_accepted_tokens += num_accepted_tokens - num_emitted_tokens = sum(real_accept_num) - if num_emitted_tokens == 0: + self.num_accepted_tokens = sum(self.accept_token_num_per_head[1:]) + self.num_emitted_tokens = sum(self.accept_token_num_per_head) + if self.num_emitted_tokens == 0: return - self.num_emitted_tokens += num_emitted_tokens - main_process_metrics.spec_decode_num_accepted_tokens_total.inc(num_accepted_tokens) - main_process_metrics.spec_decode_num_emitted_tokens_total.inc(num_emitted_tokens) + main_process_metrics.spec_decode_num_accepted_tokens_total.set(self.num_accepted_tokens) + main_process_metrics.spec_decode_num_emitted_tokens_total.set(self.num_emitted_tokens) if self.cfg.speculative_config.method in ["ngram"]: main_process_metrics.spec_decode_draft_acceptance_rate.set( @@ -872,25 +906,26 @@ class TokenProcessor: main_process_metrics.spec_decode_efficiency.set(self.num_emitted_tokens / self.max_num_emitted_tokens) main_process_metrics.spec_decode_num_draft_tokens_total.inc(num_draft_tokens) - num_rest_requests = len(real_accept_num) - for head in range(self.cfg.speculative_config.num_speculative_tokens): - num_accept_requests = len([x for x in real_accept_num if x >= head + 2]) - # Accumulate the number of requests for each head - self.num_accept_requests_per_head[head] += num_accept_requests - self.num_rest_requests_per_head[head] += num_rest_requests - # Update the rest requests for each head - num_rest_requests = num_accept_requests - # Calculate the acceptance rate for each head - if self.num_rest_requests_per_head[head] != 0: + for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1): + if self.accept_token_num_per_head[i - 1] != 0: single_head_acceptance_rate = ( - self.num_accept_requests_per_head[head] / self.num_rest_requests_per_head[head] + self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1] ) - else: - single_head_acceptance_rate = 0 - main_process_metrics.spec_decode_draft_single_head_acceptance_rate[head].set( + main_process_metrics.spec_decode_draft_single_head_acceptance_rate[i - 1].set( single_head_acceptance_rate ) + def _record_speculative_decoding_accept_num_per_request(self, req_id, accept_num): + if req_id not in self.total_step_per_request: + self.total_step_per_request[req_id] = 0 + if req_id not in self.accept_token_num_per_head_per_request: + self.accept_token_num_per_head_per_request[req_id] = [0] * MAX_DRAFT_TOKENS + + self.total_step_per_request[req_id] += 1 + for i in range(accept_num): + self.accept_token_num_per_head_per_request[req_id][i] += 1 + self.accept_token_num_per_head[i] += 1 + def clear_data(self): if envs.ENABLE_V1_KVCACHE_SCHEDULER: self.resource_manager.clear_data() diff --git a/fastdeploy/worker/output.py b/fastdeploy/worker/output.py index 2b66ce4e1..3b1096244 100644 --- a/fastdeploy/worker/output.py +++ b/fastdeploy/worker/output.py @@ -128,6 +128,38 @@ class LogprobsTensors(NamedTuple): PromptLogprobs = LogprobsTensors | list[dict[int, Logprob] | None] +@dataclass +class SpeculateMetrics: + """ + Speculative decoding metrics + """ + + """ + The number of accepted tokens in the current request + """ + accepted_tokens: int + + """ + The number of rejected tokens in the current request + """ + rejected_tokens: int + + """ + The acceptance rate of the current request + """ + accept_ratio: float + + """ + Average number of accepted tokens per step for the current request + """ + average_accept_length: float + + """ + Average acceptance rate of each head in the current request + """ + accept_ratio_per_head: list[float] + + @dataclass class SamplerOutput: """ """ diff --git a/tests/entrypoints/openai/test_completion_echo.py b/tests/entrypoints/openai/test_completion_echo.py index 679f6d8ec..087d159d7 100644 --- a/tests/entrypoints/openai/test_completion_echo.py +++ b/tests/entrypoints/openai/test_completion_echo.py @@ -46,6 +46,7 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): "finished": True, }, "output_token_ids": 3, + "metrics": {}, } self.mock_engine.generate.return_value = [mock_output] @@ -80,6 +81,7 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): "finished": True, }, "output_token_ids": 3, + "metrics": {}, } self.mock_engine.generate.return_value = [mock_output] @@ -109,10 +111,12 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): { "outputs": {"text": " response1", "token_ids": [1, 2], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, { "outputs": {"text": " response2", "token_ids": [3, 4], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, ] self.mock_engine.generate.return_value = mock_outputs @@ -146,10 +150,12 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): { "outputs": {"text": " response1", "token_ids": [1, 2], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, { "outputs": {"text": " response2", "token_ids": [3, 4], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, ] self.mock_engine.generate.return_value = mock_outputs diff --git a/tests/entrypoints/openai/test_max_streaming_tokens.py b/tests/entrypoints/openai/test_max_streaming_tokens.py index 48935cba8..ed11226c3 100644 --- a/tests/entrypoints/openai/test_max_streaming_tokens.py +++ b/tests/entrypoints/openai/test_max_streaming_tokens.py @@ -301,6 +301,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase): ], }, "finished": True, + "metrics": {}, }, { "request_id": "test_request_id_1", @@ -314,6 +315,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase): ], }, "finished": True, + "metrics": {}, }, ] @@ -473,6 +475,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase): prompt_logprobs_res_list=prompt_logprobs_res_list, response_processor=mock_response_processor, max_tokens=max_tokens_list[idx], + speculate_metrics=None, ) expected = case["expected"] diff --git a/tests/entrypoints/openai/test_serving_completion.py b/tests/entrypoints/openai/test_serving_completion.py index 680d775bd..761213d1d 100644 --- a/tests/entrypoints/openai/test_serving_completion.py +++ b/tests/entrypoints/openai/test_serving_completion.py @@ -129,6 +129,7 @@ class TestOpenAIServingCompletion(unittest.IsolatedAsyncioTestCase): "reasoning_token_num": 10, }, "output_token_ids": 3, + "metrics": {}, }, { "outputs": { @@ -141,6 +142,7 @@ class TestOpenAIServingCompletion(unittest.IsolatedAsyncioTestCase): "reasoning_token_num": 20, }, "output_token_ids": 3, + "metrics": {}, }, ] diff --git a/tests/output/test_process_batch_output.py b/tests/output/test_process_batch_output.py index 6dd8f5135..ab964efb6 100644 --- a/tests/output/test_process_batch_output.py +++ b/tests/output/test_process_batch_output.py @@ -138,13 +138,10 @@ class TestTokenProcessorProcessBatchOutput(unittest.TestCase): processor.num_accepted_tokens = 0 processor.num_emitted_tokens = 0 processor.max_num_emitted_tokens = 0 - processor.num_rest_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS - processor.num_accept_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS processor.speculative_stats_step = 0 + processor.total_step_per_request = {} + processor.accept_token_num_per_head_per_request = {} + processor.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS # processor._recycle_resources = Mock()