diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ce8942933..a5939966c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,6 +3,7 @@ default_install_hook_types: - commit-msg default_stages: - pre-commit # Run locally + - commit-msg # - manual # Run in CI repos: - repo: https://github.com/psf/black.git diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index bc0b98212..d0e6defae 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -860,7 +860,7 @@ class LLMEngine: ) if self.do_profile: - get_profile_block_num = np.zeros([self.cfg.worker_num_per_node], dtype=np.int32) + get_profile_block_num = np.zeros([1], dtype=np.int32) self.get_profile_block_num_signal = IPCSignal( name="get_profile_block_num", array=get_profile_block_num, @@ -1118,15 +1118,9 @@ class LLMEngine: Stop profiling of the model server and reset variables. """ self.do_profile = 0 - num_gpu_blocks = -1 - for i in range(self.cfg.tensor_parallel_size): - while self.get_profile_block_num_signal.value[i] == 0: - time.sleep(1) - if num_gpu_blocks < 0: - num_gpu_blocks = self.get_profile_block_num_signal.value[i] - else: - num_gpu_blocks = min(num_gpu_blocks, self.get_profile_block_num_signal.value[i]) - + while self.get_profile_block_num_signal.value[0] == 0: + time.sleep(1) + num_gpu_blocks = self.get_profile_block_num_signal.value[0] self.cfg.cache_config.reset(num_gpu_blocks) self.resource_manager.reset_cache_config(self.cfg.cache_config) if self.cfg.cache_config.enable_prefix_caching or self.cfg.splitwise_role != "mixed": diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 96dbdf1bb..8746b65eb 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -141,7 +141,8 @@ class EngineClient: task["preprocess_end_time"] = time.time() preprocess_cost_time = task["preprocess_end_time"] - task["preprocess_start_time"] api_server_logger.info( - f"Cache request with request_id ({task.get('request_id')}), " f"preprocess time cost {preprocess_cost_time}" + f"Cache request with request_id ({task.get('request_id')}), " + f"preprocess time cost {preprocess_cost_time}" ) self.vaild_parameters(task) diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index 2a1d3c56f..8ee6396fc 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -110,6 +110,7 @@ class XPUForwardMeta(ForwardMeta): """ XPUForwardMeta is used to store the global meta information of the forward, and some XPU specific meta info. """ + # Accumulated offset cum_offsets: Optional[paddle.Tensor] = None # TODO(wanghaitao): Supplementary notes diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 72ea36c24..eba0250cc 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -375,42 +375,33 @@ class PaddleDisWorkerProc: logger.info(f"------- model_block_memory_used:{model_block_memory_used} --------") logger.info(f"------- num_blocks_local:{num_blocks_local} --------") - logger.info(f"self.fd_config.parallel_config.do_profile:{self.fd_config.parallel_config.do_profile}") - - # 3. Send IPCSignal - get_profile_block_num = np.zeros(shape=[self.ranks], dtype=np.int32) - self.get_profile_block_num_signal = IPCSignal( - name="get_profile_block_num", - array=get_profile_block_num, - dtype=np.int32, - suffix=self.parallel_config.engine_pid, - create=False, - ) - self.get_profile_block_num_signal.value[self.local_rank] = num_blocks_local - - # Wait all worker send the signal - while np.any(self.get_profile_block_num_signal.value <= 0): - time.sleep(0.01) - num_blocks_global = self.get_profile_block_num_signal.value.min().item() - - if num_blocks_global < 0: - logger.error( - "The total number of blocks cannot be less than zero." - "Please increase gpu_memory_utilization" - "Or decrease max_num_batched_tokens(max model length) " - ) + if num_blocks_local <= 0: raise ValueError( "The total number of blocks cannot be less than zero." "Please increase gpu_memory_utilization" "Or decrease max_num_batched_tokens(max model length) " ) - self.get_profile_block_num_signal.value[self.local_rank] = num_blocks_global + if self.ranks > 1: + num_blocks_local = paddle.full(shape=[1], fill_value=num_blocks_local, dtype="int32") + dist.all_reduce(num_blocks_local, op=dist.ReduceOp.MIN) + num_blocks_local = num_blocks_local.item() + + if self.local_rank == 0: + # 3. Send IPCSignal + get_profile_block_num = np.zeros(shape=[1], dtype=np.int32) + self.get_profile_block_num_signal = IPCSignal( + name="get_profile_block_num", + array=get_profile_block_num, + dtype=np.int32, + suffix=self.parallel_config.engine_pid, + create=False, + ) + self.get_profile_block_num_signal.value[0] = num_blocks_local else: - num_blocks_global = self.fd_config.parallel_config.total_block_num - # NOTE(liuzichang): Too big num_blocks_global will lead to error 700 + num_blocks_local = self.fd_config.parallel_config.total_block_num # 4. Updata share inputs - self.worker.reinitialize_kv_cache(num_gpu_blocks=num_blocks_global) + self.worker.reinitialize_kv_cache(num_gpu_blocks=num_blocks_local) def init_device(self) -> None: """Initialize device and Construct model runner""" diff --git a/test/ci_use/EB_VL_Lite/rollout_model.py b/test/ci_use/EB_VL_Lite/rollout_model.py index f9d5fedf0..ee540e0fa 100644 --- a/test/ci_use/EB_VL_Lite/rollout_model.py +++ b/test/ci_use/EB_VL_Lite/rollout_model.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import difflib -import os import argparse +import difflib from paddleformers.trl.llm_utils import init_dist_env @@ -24,12 +23,7 @@ from fastdeploy.rl.rollout_model import RolloutModel _, ranks = init_dist_env() parser = argparse.ArgumentParser() -parser.add_argument( - "--model_path", - type=str, - required=True, - help="Path to the model directory" -) +parser.add_argument("--model_path", type=str, required=True, help="Path to the model directory") args = parser.parse_args() # base result @@ -55,6 +49,7 @@ for k, v in actor_eval_model.state_dict().items(): for k, v in actor_eval_model.get_name_mappings_to_training().items(): content += f"{k}:{v}\n" + def compare_strings(a: str, b: str) -> bool: if a == b: print("✅ 两个字符串完全一致") @@ -68,8 +63,11 @@ def compare_strings(a: str, b: str) -> bool: return False + with open("baseline.txt", "r", encoding="utf-8") as f: baseline = f.read() - assert compare_strings(baseline, content), "In the unittest of RL scenario, your modification " \ - "caused inconsistency in the content before and after. Please fix it. " \ + assert compare_strings(baseline, content), ( + "In the unittest of RL scenario, your modification " + "caused inconsistency in the content before and after. Please fix it. " "Can request assistance from yuanlehome or gzy19990617 (github id)." + ) diff --git a/test/ci_use/EB_VL_Lite/test_rollout_model.py b/test/ci_use/EB_VL_Lite/test_rollout_model.py index a6d4b4127..9fbfc4821 100644 --- a/test/ci_use/EB_VL_Lite/test_rollout_model.py +++ b/test/ci_use/EB_VL_Lite/test_rollout_model.py @@ -12,11 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import subprocess import sys -import os -import time -import pytest def test_rollout_model_with_distributed_launch(): @@ -29,26 +27,24 @@ def test_rollout_model_with_distributed_launch(): base_path = os.getenv("MODEL_PATH") if base_path: - model_path=os.path.join(base_path, "ernie-4_5-vl-28b-a3b-bf16-paddle") + model_path = os.path.join(base_path, "ernie-4_5-vl-28b-a3b-bf16-paddle") else: - model_path="./ernie-4_5-vl-28b-a3b-bf16-paddle" + model_path = "./ernie-4_5-vl-28b-a3b-bf16-paddle" command = [ sys.executable, - "-m", "paddle.distributed.launch", - "--gpus", "0,1", + "-m", + "paddle.distributed.launch", + "--gpus", + "0,1", rollout_script, - "--model_path", model_path, + "--model_path", + model_path, ] print(f"Executing command: {' '.join(command)}") - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) try: stdout, stderr = process.communicate(timeout=300) diff --git a/test/ci_use/iluvatar_UT/run_ernie300B_4layer.py b/test/ci_use/iluvatar_UT/run_ernie300B_4layer.py index 33e28806e..0ccd387e2 100644 --- a/test/ci_use/iluvatar_UT/run_ernie300B_4layer.py +++ b/test/ci_use/iluvatar_UT/run_ernie300B_4layer.py @@ -8,9 +8,33 @@ prompts = [ sampling_params = SamplingParams(temperature=0.8, top_p=0.00001, max_tokens=16) # 加载模型 -llm = LLM(model="/data1/fastdeploy/ERNIE_300B_4L", tensor_parallel_size=16, max_model_len=8192, static_decode_blocks=0, quantization='wint8', block_size=16) +llm = LLM( + model="/data1/fastdeploy/ERNIE_300B_4L", + tensor_parallel_size=16, + max_model_len=8192, + static_decode_blocks=0, + quantization="wint8", + block_size=16, +) # 批量进行推理(llm内部基于资源情况进行请求排队、动态插入处理) outputs = llm.generate(prompts, sampling_params) -assert outputs[0].outputs.token_ids==[23768, 97000, 47814, 59335, 68170, 183, 49080, 94717, 82966, 99140, 31615, 51497, 94851, 60764, 10889, 2] +assert outputs[0].outputs.token_ids == [ + 23768, + 97000, + 47814, + 59335, + 68170, + 183, + 49080, + 94717, + 82966, + 99140, + 31615, + 51497, + 94851, + 60764, + 10889, + 2, +] diff --git a/test/layers/test_sampler.py b/test/layers/test_sampler.py index c46e2c8bd..65a6bfbe6 100644 --- a/test/layers/test_sampler.py +++ b/test/layers/test_sampler.py @@ -73,6 +73,5 @@ def test_sampler(): print(next_tokens) - if __name__ == "__main__": test_sampler()