[Feature] Set scheduler v1 as default (#3812)

* [Feature] Set scheduler v1 as default

* [Feature] Set scheduler v1 as default

* [Feature] Set scheduler v1 as default

* [Feature] Set scheduler v1 as default

* [Feature] Set scheduler v1 as default

* [Feature] Set scheduler v1 as default
This commit is contained in:
chenjian
2025-09-04 11:02:10 +08:00
committed by GitHub
parent fbf0e9d2aa
commit fb1e0d6a87
8 changed files with 43 additions and 7 deletions

View File

@@ -1290,7 +1290,7 @@ class FDConfig:
), "TP and EP cannot be enabled at the same time" ), "TP and EP cannot be enabled at the same time"
if not self.cache_config.enable_chunked_prefill: if not self.cache_config.enable_chunked_prefill:
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")): if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
assert self.max_num_batched_tokens >= self.max_model_len, ( assert self.max_num_batched_tokens >= self.max_model_len, (
f"max_num_batched_tokens: {self.max_num_batched_tokens} " f"max_num_batched_tokens: {self.max_num_batched_tokens} "
f"should be larger than or equal to max_model_len: {self.max_model_len}" f"should be larger than or equal to max_model_len: {self.max_model_len}"

View File

@@ -392,6 +392,12 @@ class EngineArgs:
raise NotImplementedError("Logprob does not support enable_expert_parallel.") raise NotImplementedError("Logprob does not support enable_expert_parallel.")
if not current_platform.is_cuda(): if not current_platform.is_cuda():
raise NotImplementedError("Only CUDA platform supports logprob.") raise NotImplementedError("Only CUDA platform supports logprob.")
if self.speculative_config is not None:
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if self.splitwise_role != "mixed":
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if not current_platform.is_cuda():
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
@staticmethod @staticmethod
def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:

View File

@@ -552,8 +552,6 @@ class EngineSevice:
get_request_pool.submit(_fetch_request) get_request_pool.submit(_fetch_request)
# 2. Schedule requests # 2. Schedule requests
tasks = self.resource_manager.schedule() tasks = self.resource_manager.schedule()
main_process_metrics.num_requests_waiting.dec(len(tasks))
main_process_metrics.num_requests_running.inc(len(tasks))
# 3. Send to engine # 3. Send to engine
if tasks: if tasks:
self.resource_manager.get_real_bsz() self.resource_manager.get_real_bsz()

View File

@@ -123,6 +123,8 @@ class ResourceManagerV1(ResourceManager):
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
preempted_reqs.append(preempted_req) preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req)) scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.num_requests_running.dec(1)
if preempted_req == request: if preempted_req == request:
# No more request to preempt. # No more request to preempt.
can_schedule = False can_schedule = False
@@ -368,6 +370,8 @@ class ResourceManagerV1(ResourceManager):
token_budget -= num_new_tokens token_budget -= num_new_tokens
request.num_computed_tokens += num_new_tokens request.num_computed_tokens += num_new_tokens
request.status = RequestStatus.RUNNING request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
allocated_position = self.get_available_position() allocated_position = self.get_available_position()
request.idx = allocated_position request.idx = allocated_position
self.tasks_list[allocated_position] = request self.tasks_list[allocated_position] = request
@@ -398,6 +402,8 @@ class ResourceManagerV1(ResourceManager):
token_budget -= num_new_tokens token_budget -= num_new_tokens
request.num_computed_tokens += num_new_tokens request.num_computed_tokens += num_new_tokens
request.status = RequestStatus.RUNNING request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
else: else:
if self.config.cache_config.enable_prefix_caching: if self.config.cache_config.enable_prefix_caching:
self._free_blocks(request) self._free_blocks(request)

View File

@@ -81,7 +81,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
# set traec exporter_otlp_headers. # set traec exporter_otlp_headers.
"EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"), "EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
# enable kv cache block scheduler v1 (no need for kv_cache_ratio) # enable kv cache block scheduler v1 (no need for kv_cache_ratio)
"ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")), "ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "1")),
# Whether to use PLUGINS. # Whether to use PLUGINS.
"FD_PLUGINS": lambda: None if "FD_PLUGINS" not in os.environ else os.environ["FD_PLUGINS"].split(","), "FD_PLUGINS": lambda: None if "FD_PLUGINS" not in os.environ else os.environ["FD_PLUGINS"].split(","),
# set trace attribute job_id. # set trace attribute job_id.
@@ -105,5 +105,10 @@ def __getattr__(name: str):
raise AttributeError(f"module {__name__!r} has no attribute {name!r}") raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
def __setattr__(name: str, value: Any):
assert name in environment_variables
environment_variables[name] = lambda: value
def __dir__(): def __dir__():
return list(environment_variables.keys()) return list(environment_variables.keys())

View File

@@ -1337,7 +1337,11 @@ class GPUModelRunner(ModelRunnerBase):
A list of indices corresponding to the requests that need to be skipped. A list of indices corresponding to the requests that need to be skipped.
""" """
skip_idx_list = [] skip_idx_list = []
if not self.cache_config.enable_chunked_prefill or self.guided_backend is None: if (
not self.cache_config.enable_chunked_prefill
or self.guided_backend is None
or envs.ENABLE_V1_KVCACHE_SCHEDULER
):
return skip_idx_list return skip_idx_list
for task in model_forward_batch: for task in model_forward_batch:

View File

@@ -740,6 +740,20 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}") logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}")
logger.info(f"- Load strategy: {load_config.load_strategy}") logger.info(f"- Load strategy: {load_config.load_strategy}")
if (
args.speculative_config is not None
and ("method" in args.speculative_config)
and (args.speculative_config["method"] is not None)
):
logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not support speculative decoding now.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if args.splitwise_role != "mixed":
logger.info(f"Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported {args.splitwise_role} now.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if not current_platform.is_cuda():
logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
fd_config = FDConfig( fd_config = FDConfig(
model_config=model_config, model_config=model_config,
parallel_config=parallel_config, parallel_config=parallel_config,

View File

@@ -1,5 +1,6 @@
import unittest import unittest
from fastdeploy import envs
from fastdeploy.config import ( from fastdeploy.config import (
CacheConfig, CacheConfig,
FDConfig, FDConfig,
@@ -48,7 +49,8 @@ class TestConfig(unittest.TestCase):
ips="0.0.0.0", ips="0.0.0.0",
test_mode=True, test_mode=True,
) )
assert fd_config.max_num_batched_tokens == 2048 if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
assert fd_config.max_num_batched_tokens == 2048
cache_config.enable_chunked_prefill = False cache_config.enable_chunked_prefill = False
fd_config = FDConfig( fd_config = FDConfig(
@@ -58,7 +60,8 @@ class TestConfig(unittest.TestCase):
ips="0.0.0.0", ips="0.0.0.0",
test_mode=True, test_mode=True,
) )
assert fd_config.max_num_batched_tokens == 8192 if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
assert fd_config.max_num_batched_tokens == 8192
def test_fdconfig_init_cache(self): def test_fdconfig_init_cache(self):
parallel_config = ParallelConfig({}) parallel_config = ParallelConfig({})