diff --git a/fastdeploy/config.py b/fastdeploy/config.py index a00229b53..5e97c07b4 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -118,6 +118,7 @@ class ModelConfig: self.enable_redundant_experts = False self.redundant_experts_num = 0 self.quantization = None + self.think_end_id = None for key, value in args.items(): if hasattr(self, key): setattr(self, key, value) diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index e7edacb26..45b11f914 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -121,8 +121,6 @@ class EngineClient: task["prompt_token_ids_len"] = len(task["prompt_token_ids"]) input_ids_len = task["prompt_token_ids_len"] task["max_tokens"] = min(self.max_model_len - input_ids_len, task.get("max_tokens")) - if task.get("reasoning_max_tokens", None) is None: - task["reasoning_max_tokens"] = max(int(task["max_tokens"] * 0.8), 1) min_tokens = task.get("min_tokens", 1) if "messages" in task: del task["messages"] diff --git a/fastdeploy/input/ernie_vl_processor.py b/fastdeploy/input/ernie_vl_processor.py index 19f05671e..af2978881 100644 --- a/fastdeploy/input/ernie_vl_processor.py +++ b/fastdeploy/input/ernie_vl_processor.py @@ -246,6 +246,10 @@ class ErnieMoEVLProcessor(ErnieProcessor): request["prompt_token_ids"] = request["prompt_token_ids"][: max_model_len - 1] if request.get("max_tokens") is None: request["max_tokens"] = max(1, max_model_len - len(request["prompt_token_ids"])) + else: + request["max_tokens"] = min(max_model_len - len(request["prompt_token_ids"]), request["max_tokens"]) + if not request.get("reasoning_max_tokens"): + request["reasoning_max_tokens"] = max(int(request["max_tokens"] * 0.8), 1) data_processor_logger.info(f"Processed request {request}") return request diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index 5a14d77b4..66296e2ae 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -160,7 +160,7 @@ def post_process_normal( ) -> ModelRunnerOutput: """Post-processing steps after completing a single token generation.""" # handle vl: - if model_output.enable_thinking: + if model_output.enable_thinking and model_output.think_end_id is not None: exists_think_end = sampler_output.sampled_token_ids == model_output.think_end_id paddle.assign( paddle.where( diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 50fc205b8..378543087 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -245,16 +245,21 @@ class GPUModelRunner(ModelRunnerBase): ).unsqueeze([0]) else: position_ids = None - - enable_thinking = request.get("enable_thinking", True) - enable_thinking = enable_thinking if enable_thinking is not None else True - self.share_inputs["enable_thinking"][:] = enable_thinking - self.share_inputs["need_think_end"][idx : idx + 1, :] = 1 if enable_thinking else 0 - self.share_inputs["reasoning_index"][idx : idx + 1, :] = request.get("reasoning_max_tokens", 2048) self.share_inputs["rope_emb"][idx : idx + 1, :] = self.prepare_rope3d( position_ids, request.get("max_tokens", 2048) ) + if request.get("enable_thinking", False) and request.get("reasoning_max_tokens") is not None: + # Enable thinking + self.share_inputs["enable_thinking"][:] = True + self.share_inputs["need_think_end"][idx : idx + 1, :] = 1 + self.share_inputs["reasoning_index"][idx : idx + 1, :] = request.get("reasoning_max_tokens") + else: + # Disable thinking + self.share_inputs["enable_thinking"][:] = False + self.share_inputs["need_think_end"][idx : idx + 1, :] = 0 + self.share_inputs["reasoning_index"][idx : idx + 1, :] = 0 + input_ids = request.prompt_token_ids + request.output_token_ids logger.debug( f"Handle prefill request {request} at idx {idx} prefill_start_index {prefill_start_index} prefill_end_index {prefill_end_index} need_prefilled_token_num {len(input_ids)}" @@ -447,16 +452,22 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["prompt_lens"][idx : idx + 1] = length if self.enable_mm: - enable_thinking = request.get("enable_thinking", True) - enable_thinking = enable_thinking if enable_thinking is not None else True - self.share_inputs["enable_thinking"][:] = enable_thinking - self.share_inputs["need_think_end"][idx : idx + 1, :] = 1 if enable_thinking else 0 - self.share_inputs["reasoning_index"][idx : idx + 1, :] = request.get("reasoning_max_tokens", 2048) self.share_inputs["rope_emb"][idx : idx + 1, :] = self.prepare_rope3d( position_ids, request.get("max_tokens", 2048) ) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 + if request.get("enable_thinking", False) and request.get("reasoning_max_tokens") is not None: + # Enable thinking + self.share_inputs["enable_thinking"][:] = True + self.share_inputs["need_think_end"][idx : idx + 1, :] = 1 + self.share_inputs["reasoning_index"][idx : idx + 1, :] = request.get("reasoning_max_tokens") + else: + # Disable thinking + self.share_inputs["enable_thinking"][:] = False + self.share_inputs["need_think_end"][idx : idx + 1, :] = 0 + self.share_inputs["reasoning_index"][idx : idx + 1, :] = 0 + def get_attr_from_request(request, attr, default_value=None): res = request.get(attr, default_value) if res is not None: @@ -669,6 +680,11 @@ class GPUModelRunner(ModelRunnerBase): # Initialize rotary position embedding tmp_position_ids = paddle.arange(self.parallel_config.max_model_len).reshape((1, -1)) + # Initialize thinking related buffers + self.share_inputs["need_think_end"] = paddle.full(shape=[max_num_seqs, 1], fill_value=0, dtype="int32") + self.share_inputs["enable_thinking"] = paddle.full(shape=[1], fill_value=False, dtype="bool") + self.share_inputs["reasoning_index"] = paddle.full(shape=[max_num_seqs, 1], fill_value=0, dtype="int32") + # TODO(gongshaotian): move to models if not self.enable_mm: self.share_inputs["rope_emb"] = get_rope( @@ -755,9 +771,6 @@ class GPUModelRunner(ModelRunnerBase): dtype="float32", ) self.share_inputs["image_features"] = None - self.share_inputs["need_think_end"] = paddle.full(shape=[max_num_seqs, 1], fill_value=0, dtype="int32") - self.share_inputs["enable_thinking"] = paddle.full(shape=[1], fill_value=True, dtype="bool") - self.share_inputs["reasoning_index"] = paddle.full(shape=[max_num_seqs, 1], fill_value=0, dtype="int32") def _prepare_inputs(self) -> None: """Prepare the model inputs""" @@ -1106,10 +1119,10 @@ class GPUModelRunner(ModelRunnerBase): ), accept_tokens=(self.share_inputs["accept_tokens"] if self.speculative_decoding else None), accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None), - enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None), - think_end_id=(self.model_config.think_end_id if self.enable_mm else -1), - need_think_end=(self.share_inputs["need_think_end"] if self.enable_mm else None), - reasoning_index=(self.share_inputs["reasoning_index"] if self.enable_mm else None), + enable_thinking=self.share_inputs["enable_thinking"], + think_end_id=self.model_config.think_end_id, + need_think_end=self.share_inputs["need_think_end"], + reasoning_index=self.share_inputs["reasoning_index"], stop_token_ids=self.share_inputs["stop_seqs"], stop_seqs_len=self.share_inputs["stop_seqs_len"], ) @@ -1374,10 +1387,10 @@ class GPUModelRunner(ModelRunnerBase): ), accept_tokens=(self.share_inputs["accept_tokens"] if self.speculative_decoding else None), accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None), - enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None), - think_end_id=(self.model_config.think_end_id if self.enable_mm else -1), - need_think_end=(self.share_inputs["need_think_end"][:num_running_requests] if self.enable_mm else None), - reasoning_index=(self.share_inputs["reasoning_index"][:num_running_requests] if self.enable_mm else None), + enable_thinking=self.share_inputs["enable_thinking"], + think_end_id=self.model_config.think_end_id, + need_think_end=self.share_inputs["need_think_end"][:num_running_requests], + reasoning_index=self.share_inputs["reasoning_index"][:num_running_requests], stop_token_ids=self.share_inputs["stop_seqs"], stop_seqs_len=self.share_inputs["stop_seqs_len"], ) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 71ed1ff66..d556f68c2 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -123,6 +123,28 @@ def update_fd_config_for_mm(fd_config: FDConfig) -> None: fd_config.model_config.sequence_parallel = fd_config.parallel_config.sequence_parallel +def update_think_end_id_for_ernie(fd_config: FDConfig) -> None: + """ + Updates the think_end_id in the model config. Uses the ID of '' + if it exists, otherwise defaults to None. + """ + is_ernie = ErnieArchitectures.contains_ernie_arch(fd_config.model_config.architectures) + if is_ernie: + tokenizer = ErnieBotTokenizer.from_pretrained( + fd_config.model_config.model, + model_max_length=fd_config.parallel_config.max_model_len, + padding_side="right", + use_fast=False, + ) + + vocab = tokenizer.get_vocab() + fd_config.model_config.think_end_id = vocab.get("", None) + if fd_config.model_config.think_end_id is not None: + logger.info(f"Get think_end_id {fd_config.model_config.think_end_id} from vocab.") + else: + logger.info(("No token found in vocabulary, The model can not do reasoning.")) + + class PaddleDisWorkerProc: """ Paddle Distributed wrapper for fastdeploy.worker.Worker, @@ -710,7 +732,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: cache_config=cache_config, ) update_fd_config_for_mm(fd_config) - + update_think_end_id_for_ernie(fd_config) return fd_config