[Feature] Set v1 scheduler as default in develop (#3807)

* Set scheduler v1 as default

* Set scheduler v1 as default

* Set scheduler v1 as default

* Set scheduler v1 as default

* Set scheduler v1 as default

* close V1 in guided_decoding

* fix vl ci

* close V1 in guided_decoding
This commit is contained in:
chenjian
2025-09-04 15:16:56 +08:00
committed by GitHub
parent e83251699f
commit 22c165d6dd
8 changed files with 50 additions and 9 deletions

View File

@@ -1292,7 +1292,7 @@ class FDConfig:
), "TP and EP cannot be enabled at the same time" ), "TP and EP cannot be enabled at the same time"
if not self.cache_config.enable_chunked_prefill: if not self.cache_config.enable_chunked_prefill:
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")): if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
assert self.max_num_batched_tokens >= self.max_model_len, ( assert self.max_num_batched_tokens >= self.max_model_len, (
f"max_num_batched_tokens: {self.max_num_batched_tokens} " f"max_num_batched_tokens: {self.max_num_batched_tokens} "
f"should be larger than or equal to max_model_len: {self.max_model_len}" f"should be larger than or equal to max_model_len: {self.max_model_len}"

View File

@@ -392,6 +392,14 @@ class EngineArgs:
raise NotImplementedError("Logprob does not support enable_expert_parallel.") raise NotImplementedError("Logprob does not support enable_expert_parallel.")
if not current_platform.is_cuda(): if not current_platform.is_cuda():
raise NotImplementedError("Only CUDA platform supports logprob.") raise NotImplementedError("Only CUDA platform supports logprob.")
if self.speculative_config is not None:
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if self.splitwise_role != "mixed":
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if not current_platform.is_cuda():
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if self.guided_decoding_backend != "off":
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
@staticmethod @staticmethod
def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:

View File

@@ -552,8 +552,6 @@ class EngineService:
get_request_pool.submit(_fetch_request) get_request_pool.submit(_fetch_request)
# 2. Schedule requests # 2. Schedule requests
tasks = self.resource_manager.schedule() tasks = self.resource_manager.schedule()
main_process_metrics.num_requests_waiting.dec(len(tasks))
main_process_metrics.num_requests_running.inc(len(tasks))
# 3. Send to engine # 3. Send to engine
if tasks: if tasks:
self.resource_manager.get_real_bsz() self.resource_manager.get_real_bsz()

View File

@@ -137,6 +137,8 @@ class ResourceManagerV1(ResourceManager):
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
preempted_reqs.append(preempted_req) preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req)) scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.num_requests_running.dec(1)
if preempted_req == request: if preempted_req == request:
# No more request to preempt. # No more request to preempt.
can_schedule = False can_schedule = False
@@ -155,6 +157,7 @@ class ResourceManagerV1(ResourceManager):
if not self.config.model_config.enable_mm: if not self.config.model_config.enable_mm:
return num_new_tokens return num_new_tokens
request.with_image = False
inputs = request.multimodal_inputs inputs = request.multimodal_inputs
if inputs.get("patch_idx", None) is not None and inputs.get("patch_map", None) is not None: if inputs.get("patch_idx", None) is not None and inputs.get("patch_map", None) is not None:
pre_end_idx = request.num_computed_tokens pre_end_idx = request.num_computed_tokens
@@ -198,8 +201,6 @@ class ResourceManagerV1(ResourceManager):
and inputs.get("image_patch_id", None) is not None and inputs.get("image_patch_id", None) is not None
and inputs.get("grid_thw", None) is not None and inputs.get("grid_thw", None) is not None
): ):
request.with_image = False
input_ids_lst = request.prompt_token_ids + request.output_token_ids input_ids_lst = request.prompt_token_ids + request.output_token_ids
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64") input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
@@ -383,6 +384,8 @@ class ResourceManagerV1(ResourceManager):
token_budget -= num_new_tokens token_budget -= num_new_tokens
request.num_computed_tokens += num_new_tokens request.num_computed_tokens += num_new_tokens
request.status = RequestStatus.RUNNING request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
allocated_position = self.get_available_position() allocated_position = self.get_available_position()
request.idx = allocated_position request.idx = allocated_position
self.tasks_list[allocated_position] = request self.tasks_list[allocated_position] = request
@@ -413,6 +416,8 @@ class ResourceManagerV1(ResourceManager):
token_budget -= num_new_tokens token_budget -= num_new_tokens
request.num_computed_tokens += num_new_tokens request.num_computed_tokens += num_new_tokens
request.status = RequestStatus.RUNNING request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
else: else:
if self.config.cache_config.enable_prefix_caching: if self.config.cache_config.enable_prefix_caching:
self._free_blocks(request) self._free_blocks(request)

View File

@@ -81,7 +81,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
# set traec exporter_otlp_headers. # set traec exporter_otlp_headers.
"EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"), "EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
# enable kv cache block scheduler v1 (no need for kv_cache_ratio) # enable kv cache block scheduler v1 (no need for kv_cache_ratio)
"ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")), "ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "1")),
# Whether to use PLUGINS. # Whether to use PLUGINS.
"FD_PLUGINS": lambda: None if "FD_PLUGINS" not in os.environ else os.environ["FD_PLUGINS"].split(","), "FD_PLUGINS": lambda: None if "FD_PLUGINS" not in os.environ else os.environ["FD_PLUGINS"].split(","),
# set trace attribute job_id. # set trace attribute job_id.
@@ -107,5 +107,10 @@ def __getattr__(name: str):
raise AttributeError(f"module {__name__!r} has no attribute {name!r}") raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
def __setattr__(name: str, value: Any):
assert name in environment_variables
environment_variables[name] = lambda: value
def __dir__(): def __dir__():
return list(environment_variables.keys()) return list(environment_variables.keys())

View File

@@ -1356,7 +1356,12 @@ class GPUModelRunner(ModelRunnerBase):
Returns: Returns:
A list of indices corresponding to the requests that need to be skipped. A list of indices corresponding to the requests that need to be skipped.
""" """
if not self.cache_config.enable_chunked_prefill or self.guided_backend is None or model_forward_batch is None: if (
not self.cache_config.enable_chunked_prefill
or self.guided_backend is None
or model_forward_batch is None
or envs.ENABLE_V1_KVCACHE_SCHEDULER
):
return [] return []
skip_idx_list = [] skip_idx_list = []

View File

@@ -748,6 +748,23 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}") logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}")
logger.info(f"- Load strategy: {load_config.load_strategy}") logger.info(f"- Load strategy: {load_config.load_strategy}")
if (
args.speculative_config is not None
and ("method" in args.speculative_config)
and (args.speculative_config["method"] is not None)
):
logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not support speculative decoding now.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if args.splitwise_role != "mixed":
logger.info(f"Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported {args.splitwise_role} now.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if not current_platform.is_cuda():
logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if parallel_config.guided_decoding_backend != "off":
logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported guided_decoding.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
fd_config = FDConfig( fd_config = FDConfig(
model_config=model_config, model_config=model_config,
parallel_config=parallel_config, parallel_config=parallel_config,

View File

@@ -1,5 +1,6 @@
import unittest import unittest
from fastdeploy import envs
from fastdeploy.config import ( from fastdeploy.config import (
CacheConfig, CacheConfig,
FDConfig, FDConfig,
@@ -48,6 +49,7 @@ class TestConfig(unittest.TestCase):
ips="0.0.0.0", ips="0.0.0.0",
test_mode=True, test_mode=True,
) )
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
assert fd_config.max_num_batched_tokens == 2048 assert fd_config.max_num_batched_tokens == 2048
cache_config.enable_chunked_prefill = False cache_config.enable_chunked_prefill = False
@@ -58,6 +60,7 @@ class TestConfig(unittest.TestCase):
ips="0.0.0.0", ips="0.0.0.0",
test_mode=True, test_mode=True,
) )
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
assert fd_config.max_num_batched_tokens == 8192 assert fd_config.max_num_batched_tokens == 8192
def test_fdconfig_init_cache(self): def test_fdconfig_init_cache(self):