diff --git a/llm/server/server/engine/config.py b/llm/server/server/engine/config.py index 4508c7e0c..25106eb83 100644 --- a/llm/server/server/engine/config.py +++ b/llm/server/server/engine/config.py @@ -93,9 +93,7 @@ class Config: self.use_cache_kv_int4 = int(os.getenv("USE_CACHE_KV_INT4", 0)) # speculate decoding config - self.speculate_method = str(env.get("SPECULATE_METHOD", None)) - self.speculate_max_draft_token_num = int(os.getenv("SPECULATE_MAX_DRAFT_TOKEN_NUM", 5)) - self.speculate_max_ngram_size = int(os.getenv("SPECULATE_MAX_NGRAM_SIZE", 2)) + self.speculate_method = str(os.getenv("SPECULATE_METHOD", None)) # infer config self.max_batch_size = int(env.get("BATCH_SIZE", 50)) diff --git a/llm/server/server/engine/infer.py b/llm/server/server/engine/infer.py index 51d33d3be..0f804b393 100644 --- a/llm/server/server/engine/infer.py +++ b/llm/server/server/engine/infer.py @@ -69,11 +69,16 @@ class ModelRunner: self.init_inputs() # whether use speculate decoding - if self.config.speculate_method is not None and self.config.speculate_method == "inference_with_reference": - self.proposer = InferenceWithReferenceProposer( - self.config.speculate_max_draft_token_num, - self.config.speculate_max_ngram_size, - self.args.max_batch_size) + logger.info(f'speculate_method: {self.config.speculate_method}') + if self.config.speculate_method is not None: + if self.config.speculate_method == "inference_with_reference": + self.proposer = InferenceWithReferenceProposer( + self.model_cfg["speculate_max_draft_token_num"], + self.model_cfg["speculate_max_ngram_size"], + self.args.max_batch_size, + self.args.max_seq_len) + else: + raise NotImplementedError(f'Not support {self.config.speculate_method}, only support inference_with_reference now.') else: self.proposer = None @@ -274,18 +279,17 @@ class ModelRunner: self.share_inputs["ori_seq_lens_encoder"] = paddle.full( shape=[self.args.max_batch_size, 1], fill_value=0, dtype="int32") # speculate decoding input + logger.info(f'Speculative method: {self.config.speculate_method}') if self.config.speculate_method is not None: - self.share_inputs["input_ids_cpu"] = paddle.full( - shape=[self.args.max_batch_size, self.args.max_seq_len], fill_value=1, dtype='int64').cpu() self.share_inputs["accept_tokens"] = paddle.full( - shape=[self.args.max_batch_size, self.config.speculate_max_draft_token_num + 1], fill_value=0, dtype="int64" + shape=[self.args.max_batch_size, self.model_cfg["speculate_max_draft_token_num"] + 1], fill_value=0, dtype="int64" ) self.share_inputs["accept_num"] = paddle.full(shape=[self.args.max_batch_size], fill_value=0, dtype="int32") self.share_inputs["draft_tokens"] = paddle.full( - shape=[self.args.max_batch_size, self.config.speculate_max_draft_token_num + 1], fill_value=0, dtype="int64" + shape=[self.args.max_batch_size, self.model_cfg["speculate_max_draft_token_num"] + 1], fill_value=0, dtype="int64" ) self.share_inputs["actual_draft_token_num"] = paddle.full( - shape=[self.args.max_batch_size], fill_value=self.config.speculate_max_draft_token_num, dtype="int32" + shape=[self.args.max_batch_size], fill_value=self.model_cfg["speculate_max_draft_token_num"], dtype="int32" ) def dy_input_preprocess(self, tasks): @@ -344,10 +348,8 @@ class ModelRunner: task["stop_seqs"], dtype="int64") if self.proposer is not None: if self.config.speculate_method == "inference_with_reference": - speculate_update_input_ids_cpu(self.share_inputs['input_ids_cpu'], task['input_ids'], idx, self.args.max_seq_len) - self.share_inputs["draft_tokens"][idx:idx + 1] = np.zeros([self.config.speculate_max_draft_token_num + 1]) - self.share_inputs["actual_draft_token_num"][idx:idx + 1] = np.array([self.config.speculate_max_draft_token_num]) - self.proposer.update(idx, length) + self.share_inputs["draft_tokens"][idx:idx + 1] = np.zeros([self.model_cfg["speculate_max_draft_token_num"] + 1]) + self.share_inputs["actual_draft_token_num"][idx:idx + 1] = np.array([self.model_cfg["speculate_max_draft_token_num"]]) def step_cuda(self, seq_lens_this_time): """ @@ -381,7 +383,7 @@ class ModelRunner: self.share_inputs['input_ids'], self.share_inputs['pre_ids'], self.share_inputs['step_idx'], self.share_inputs['next_tokens'], self.args.block_size, self.args.enc_dec_block_num, self.args.first_token_id, - self.config.speculate_max_draft_token_num) + self.model_cfg["speculate_max_draft_token_num"]) def initialize_engine_ready_check_flag(self): """ @@ -512,7 +514,6 @@ class ModelRunner: if self.proposer is not None: logger.info("start run proposer") logger.info(f'before draft_tokens: {self.share_inputs["draft_tokens"]}') - logger.info(f'before accept_tokens: {self.share_inputs["accept_tokens"]}') self.proposer.run( self.share_inputs, @@ -521,19 +522,19 @@ class ModelRunner: ) logger.info(f'after draft_tokens: {self.share_inputs["draft_tokens"]}') logger.info("finish run proposer") - logger.info(f'input_ids: {self.share_inputs["input_ids"]}') - logger.info(f'input_ids_cpu: {self.share_inputs["input_ids_cpu"]}') - logger.info(f'seq_lens_this_time: {self.share_inputs["seq_lens_this_time"]}') - logger.info(f'seq_lens_encoder: {self.share_inputs["seq_lens_encoder"]}') - logger.info(f'seq_lens_decoder: {self.share_inputs["seq_lens_decoder"]}') - logger.info(f'step_idx: {self.share_inputs["step_idx"]}') - logger.info(f'next_tokens: {self.share_inputs["next_tokens"]}') - logger.info(f'before block_tables: {self.share_inputs["block_tables"]}') + # logger.info(f'input_ids: {self.share_inputs["input_ids"]}') + # logger.info(f'input_ids_cpu: {self.share_inputs["input_ids_cpu"]}') + # logger.info(f'seq_lens_this_time: {self.share_inputs["seq_lens_this_time"]}') + # logger.info(f'seq_lens_encoder: {self.share_inputs["seq_lens_encoder"]}') + # logger.info(f'seq_lens_decoder: {self.share_inputs["seq_lens_decoder"]}') + # logger.info(f'step_idx: {self.share_inputs["step_idx"]}') + # logger.info(f'next_tokens: {self.share_inputs["next_tokens"]}') + # logger.info(f'before block_tables: {self.share_inputs["block_tables"]}') self.infer_engine.predictor.run() logger.info(f'after accept_tokens: {self.share_inputs["accept_tokens"]}') logger.info(f'after accept_num: {self.share_inputs["accept_num"]}') - logger.info(f'after block_tables: {self.share_inputs["block_tables"]}') + # logger.info(f'after block_tables: {self.share_inputs["block_tables"]}') self.share_inputs['infer_seed'].add_(infer_seed_increment) self.share_inputs['infer_seed'][:] %= self.MAX_INFER_SEED diff --git a/llm/server/server/engine/proposers.py b/llm/server/server/engine/proposers.py index 68d2b41c9..f2a1d2b0a 100644 --- a/llm/server/server/engine/proposers.py +++ b/llm/server/server/engine/proposers.py @@ -16,7 +16,6 @@ from __future__ import annotations from abc import ABC, abstractmethod import paddle -from paddlenlp_ops import ngram_match class Proposer(ABC): @@ -43,7 +42,7 @@ class InferenceWithReferenceProposer(Proposer): It match tokens in the input and output as draft tokens. """ - def __init__(self, max_draft_token_num: int, max_ngram_size: int, max_batch_size: int): + def __init__(self, max_draft_token_num: int, max_ngram_size: int, max_batch_size: int, max_seq_len: int, **kwargs): """ Args: max_draft_token_num (int): @@ -54,34 +53,33 @@ class InferenceWithReferenceProposer(Proposer): The hyperparameter of n in the paper. max_batch_size (int): The maximum batch size. + max_seq_len (int): + The maximum sequence length. """ super().__init__() self.max_ngram_size = max_ngram_size self.input_ids_len = paddle.zeros(shape=[max_batch_size, 1], dtype="int64").cpu() + self.input_ids_cpu = paddle.zeros(shape=[max_batch_size, max_seq_len], dtype="int64").cpu() self.max_batch_size = max_batch_size self.max_draft_token_num = max_draft_token_num - # self.input_ids_cpu = paddle.full(shape=[max_batch_size, max_seq_len], fill_value=1, dtype="int64").cpu() - def update(self, bid: int, seq_len: int): - """ - Used when inserting a new query to update the length of the input_ids. - """ - self.input_ids_len[bid] = seq_len - - def run(self, share_inputs: dict[str, paddle.Tensor], **kargs): + def run(self, model_inputs: dict[str, paddle.Tensor], **kargs): """ Use ngram_match to get draft tokens from the input and output. """ - draft_tokens = share_inputs["draft_tokens"].cpu() + draft_tokens = model_inputs["draft_tokens"].cpu() seq_lens_this_time = kargs["seq_lens_this_time"].cpu() - seq_lens_encoder = share_inputs["seq_lens_encoder"].cpu() - seq_lens_decoder = share_inputs["seq_lens_decoder"].cpu() + seq_lens_encoder = model_inputs["seq_lens_encoder"].cpu() + seq_lens_decoder = model_inputs["seq_lens_decoder"].cpu() + + from paddlenlp_ops import ngram_match + ngram_match( - share_inputs["input_ids_cpu"], + self.input_ids_cpu, self.input_ids_len.cpu(), - share_inputs["pre_ids"].cpu(), - share_inputs["step_idx"].cpu(), - share_inputs["actual_draft_token_num"].cpu(), + model_inputs["pre_ids"].cpu(), + model_inputs["step_idx"].cpu(), + model_inputs["actual_draft_token_num"].cpu(), draft_tokens, seq_lens_this_time, seq_lens_encoder, @@ -90,6 +88,7 @@ class InferenceWithReferenceProposer(Proposer): self.max_ngram_size, self.max_draft_token_num, ) - share_inputs["draft_tokens"][:] = draft_tokens.cuda() - share_inputs["seq_lens_encoder"][:] = seq_lens_encoder.cuda() + + model_inputs["draft_tokens"][:] = draft_tokens.cuda() + model_inputs["seq_lens_encoder"][:] = seq_lens_encoder.cuda() kargs["seq_lens_this_time"][:] = seq_lens_this_time.cuda() diff --git a/llm/server/server/engine/token_processor.py b/llm/server/server/engine/token_processor.py index 1b2d6d596..88a09c900 100644 --- a/llm/server/server/engine/token_processor.py +++ b/llm/server/server/engine/token_processor.py @@ -41,7 +41,7 @@ class TokenProcessor(object): self.tokens_counter = Counter() if self.cfg.speculate_method is not None: - self.output_tokens = paddle.full(shape=[SPECULATE_MAX_BSZ * MAX_DRAFT_TOKEN_NUM + MAX_DRAFT_TOKEN_NUM + 2], fill_value=2, dtype="int64") + self.output_tokens = paddle.full(shape=[SPECULATE_MAX_BSZ * MAX_DRAFT_TOKEN_NUM + SPECULATE_MAX_BSZ + 2], fill_value=2, dtype="int64") else: self.output_tokens = paddle.full(shape=[self.cfg.max_batch_size + 2, 1], fill_value=2, dtype="int64") self.worker = None @@ -302,6 +302,7 @@ class TokenProcessor(object): batch post-processing function """ tokens = self.output_tokens.numpy() + model_server_logger.info(f"speculate_result tokens: {self.output_tokens.tolist()}") batch = self.output_tokens[1] output_token_msg_id = int(self.output_tokens[0]) accept_num = tokens[2 : batch + 2] @@ -373,6 +374,21 @@ class WarmUpTokenProcessor(TokenProcessor): except Exception as e: model_server_logger.info("while get input_data error: {0} {1}".format(e, str(traceback.format_exc()))) + def process_speculate_results(self): + """ + read tokens from paddle inference engine and process + """ + while self._is_running: + try: + rank_id = 0 + speculate_get_output(self.output_tokens, rank_id, self._is_blocking) + + if self.output_tokens[0] == -2: + continue + self._process_speculate_output() + except Exception as e: + model_server_logger.info("while get input_data error: {0} {1}".format(e, str(traceback.format_exc()))) + def stop(self): """ stop warm up thread