""" # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ from collections.abc import Sequence from typing import Optional, Union from fastdeploy.entrypoints.openai.protocol import ChatCompletionRequest, DeltaMessage from fastdeploy.reasoning import ReasoningParser, ReasoningParserManager @ReasoningParserManager.register_module("ernie-45-vl-thinking") class Ernie45VLThinkingReasoningParser(ReasoningParser): """ Reasoning parser for ernie_vl model. The ernie_vl model uses ...... tokens to denote reasoning text within its output. The model provides a strict switch to disable reasoning output via the 'enable_thinking=False' parameter. This parser extracts the reasoning content enclosed by and tokens from the model's output. """ def __init__(self, tokenizer): super().__init__(tokenizer) token_definitions = { "think_start_token": "", "think_end_token": "", "tool_call_start_token": "", "tool_call_end_token": "", } if not self.model_tokenizer: raise ValueError( "The model tokenizer must be passed to the ReasoningParser " "constructor during construction." ) missing_tokens = [] for name, token_value in token_definitions.items(): setattr(self, name, token_value) token_id = self.vocab.get(token_value) setattr(self, f"{name}_id", token_id) if token_id is None: missing_tokens.append(f"{name.replace('_', ' ')} token") if missing_tokens: raise RuntimeError( f"ernie vl reasoning parser could not find the following token ids in tokenizer vocabulary: {', '.join(missing_tokens)}" ) self.token_status_mapping = { self.think_start_token_id: "think_start", self.think_end_token_id: "think_end", self.tool_call_start_token_id: "tool_call_start", self.tool_call_end_token_id: "tool_call_end", } def is_reasoning_end(self, input_ids: list[int]) -> bool: return self.think_end_token_id in input_ids def find_last_special_token(self, prompt_token_ids: list[int]) -> int: for i in range(len(prompt_token_ids) - 1, -1, -1): if prompt_token_ids[i] in self.token_status_mapping: return prompt_token_ids[i] return -1 def get_model_status(self, prompt_token_ids: list[int]): special_token_id = self.find_last_special_token(prompt_token_ids) if special_token_id == -1: return "think_start" return self.token_status_mapping[special_token_id] def extract_reasoning_content_streaming( self, previous_text: str, current_text: str, delta_text: str, previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], model_status: str, ) -> Union[DeltaMessage, None]: """ Extract reasoning content from a delta message. Handles streaming output where previous + delta = current. Uses token IDs for faster processing. For text abcxyz: - 'abc' goes to reasoning_content - 'xyz' goes to content """ if model_status == "think_start": if self.think_end_token not in current_text: return DeltaMessage(reasoning_content=delta_text) # Skip single special tokens if len(delta_token_ids) == 1 and delta_token_ids[0] == self.think_end_token_id: return None if self._is_with_tool(current_text=current_text, current_token_ids=current_token_ids): if self.think_end_token in delta_text: think_begin = delta_text.find(self.think_end_token) reasoning_content = delta_text[:think_begin] return DeltaMessage(reasoning_content=reasoning_content) return None if self.think_end_token in delta_text: reasoning_content, _, content = delta_text.partition(self.think_end_token) striped_content = content.strip("\n") if len(striped_content) == 0: return DeltaMessage(reasoning_content=reasoning_content) if reasoning_content else None return ( DeltaMessage(reasoning_content=reasoning_content, content=content) if reasoning_content else DeltaMessage(content=content) ) think_end = current_text.find(self.think_end_token) + len(self.think_end_token) suffix = current_text[think_end:] striped_suffix = suffix.strip("\n") if len(striped_suffix) == 0: return None return DeltaMessage(content=delta_text) elif model_status == "think_end": if current_text.lstrip("\n").startswith(self.tool_call_start_token): return None return DeltaMessage(content=delta_text) else: return None def extract_reasoning_content( self, model_output: str, request: ChatCompletionRequest, model_status: str, ) -> tuple[Optional[str], Optional[str]]: """ Extract reasoning content from the model output. For text abcxyz: - 'abc' goes to reasoning_content - 'xyz' goes to content Returns: tuple[Optional[str], Optional[str]]: reasoning content and content """ # Check if the model output contains the tokens. if model_status == "think_start": if self.think_end_token not in model_output: return model_output, "" reasoning_content, _, content = model_output.partition(self.think_end_token) if self.tool_call_start_token in content: prefix, _, _ = content.partition(self.tool_call_start_token) prefix_strip = prefix.lstrip("\n") if len(prefix_strip) > 0: return reasoning_content, content return reasoning_content, "" return reasoning_content, content elif model_status == "think_end": if model_output.lstrip("\n").startswith(self.tool_call_start_token): return "", "" return "", model_output else: return "", "" def _is_with_tool(self, current_text: str, current_token_ids: Sequence[int]) -> bool: think_end_index = current_text.find(self.think_end_token) think_end = think_end_index + len(self.think_end_token) middle_str = current_text[think_end:] if self.tool_call_start_token_id in current_token_ids: prefix, _, _ = middle_str.partition(self.tool_call_start_token) striped_prefix = prefix.strip("\n") if len(striped_prefix) > 0: return False return True return False