diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 1a461d81e..03a1a8b19 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1292,7 +1292,7 @@ class FDConfig: ), "TP and EP cannot be enabled at the same time" if not self.cache_config.enable_chunked_prefill: - if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")): + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: assert self.max_num_batched_tokens >= self.max_model_len, ( f"max_num_batched_tokens: {self.max_num_batched_tokens} " f"should be larger than or equal to max_model_len: {self.max_model_len}" diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 0d0dedbe2..b3a8b7e56 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -392,6 +392,14 @@ class EngineArgs: raise NotImplementedError("Logprob does not support enable_expert_parallel.") if not current_platform.is_cuda(): raise NotImplementedError("Only CUDA platform supports logprob.") + if self.speculative_config is not None: + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + if self.splitwise_role != "mixed": + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + if not current_platform.is_cuda(): + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + if self.guided_decoding_backend != "off": + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 @staticmethod def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index e6b8442a1..929e093c4 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -552,8 +552,6 @@ class EngineService: get_request_pool.submit(_fetch_request) # 2. Schedule requests tasks = self.resource_manager.schedule() - main_process_metrics.num_requests_waiting.dec(len(tasks)) - main_process_metrics.num_requests_running.inc(len(tasks)) # 3. Send to engine if tasks: self.resource_manager.get_real_bsz() diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index f073fdf60..8db1cca57 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -137,6 +137,8 @@ class ResourceManagerV1(ResourceManager): self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) preempted_reqs.append(preempted_req) scheduled_reqs.append(self._prepare_preempt_task(preempted_req)) + main_process_metrics.num_requests_waiting.inc(1) + main_process_metrics.num_requests_running.dec(1) if preempted_req == request: # No more request to preempt. can_schedule = False @@ -155,6 +157,7 @@ class ResourceManagerV1(ResourceManager): if not self.config.model_config.enable_mm: return num_new_tokens + request.with_image = False inputs = request.multimodal_inputs if inputs.get("patch_idx", None) is not None and inputs.get("patch_map", None) is not None: pre_end_idx = request.num_computed_tokens @@ -198,8 +201,6 @@ class ResourceManagerV1(ResourceManager): and inputs.get("image_patch_id", None) is not None and inputs.get("grid_thw", None) is not None ): - request.with_image = False - input_ids_lst = request.prompt_token_ids + request.output_token_ids input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") @@ -383,6 +384,8 @@ class ResourceManagerV1(ResourceManager): token_budget -= num_new_tokens request.num_computed_tokens += num_new_tokens request.status = RequestStatus.RUNNING + main_process_metrics.num_requests_waiting.dec(1) + main_process_metrics.num_requests_running.inc(1) allocated_position = self.get_available_position() request.idx = allocated_position self.tasks_list[allocated_position] = request @@ -413,6 +416,8 @@ class ResourceManagerV1(ResourceManager): token_budget -= num_new_tokens request.num_computed_tokens += num_new_tokens request.status = RequestStatus.RUNNING + main_process_metrics.num_requests_waiting.dec(1) + main_process_metrics.num_requests_running.inc(1) else: if self.config.cache_config.enable_prefix_caching: self._free_blocks(request) diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 45a1b18c1..73b9d8cb2 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -81,7 +81,7 @@ environment_variables: dict[str, Callable[[], Any]] = { # set traec exporter_otlp_headers. "EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"), # enable kv cache block scheduler v1 (no need for kv_cache_ratio) - "ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")), + "ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "1")), # Whether to use PLUGINS. "FD_PLUGINS": lambda: None if "FD_PLUGINS" not in os.environ else os.environ["FD_PLUGINS"].split(","), # set trace attribute job_id. @@ -107,5 +107,10 @@ def __getattr__(name: str): raise AttributeError(f"module {__name__!r} has no attribute {name!r}") +def __setattr__(name: str, value: Any): + assert name in environment_variables + environment_variables[name] = lambda: value + + def __dir__(): return list(environment_variables.keys()) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 378d12d36..d89990c53 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1356,7 +1356,12 @@ class GPUModelRunner(ModelRunnerBase): Returns: A list of indices corresponding to the requests that need to be skipped. """ - if not self.cache_config.enable_chunked_prefill or self.guided_backend is None or model_forward_batch is None: + if ( + not self.cache_config.enable_chunked_prefill + or self.guided_backend is None + or model_forward_batch is None + or envs.ENABLE_V1_KVCACHE_SCHEDULER + ): return [] skip_idx_list = [] diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 2df3222a3..b15c3fb16 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -748,6 +748,23 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}") logger.info(f"- Load strategy: {load_config.load_strategy}") + if ( + args.speculative_config is not None + and ("method" in args.speculative_config) + and (args.speculative_config["method"] is not None) + ): + logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not support speculative decoding now.") + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + if args.splitwise_role != "mixed": + logger.info(f"Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported {args.splitwise_role} now.") + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + if not current_platform.is_cuda(): + logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported.") + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + if parallel_config.guided_decoding_backend != "off": + logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported guided_decoding.") + envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + fd_config = FDConfig( model_config=model_config, parallel_config=parallel_config, diff --git a/tests/utils/test_config.py b/tests/utils/test_config.py index 71e420216..c68154013 100644 --- a/tests/utils/test_config.py +++ b/tests/utils/test_config.py @@ -1,5 +1,6 @@ import unittest +from fastdeploy import envs from fastdeploy.config import ( CacheConfig, FDConfig, @@ -48,7 +49,8 @@ class TestConfig(unittest.TestCase): ips="0.0.0.0", test_mode=True, ) - assert fd_config.max_num_batched_tokens == 2048 + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: + assert fd_config.max_num_batched_tokens == 2048 cache_config.enable_chunked_prefill = False fd_config = FDConfig( @@ -58,7 +60,8 @@ class TestConfig(unittest.TestCase): ips="0.0.0.0", test_mode=True, ) - assert fd_config.max_num_batched_tokens == 8192 + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: + assert fd_config.max_num_batched_tokens == 8192 def test_fdconfig_init_cache(self): parallel_config = ParallelConfig({})