[PD Disaggregation] Add timestamp for analyzing splitwise deployment (#5317)

* Add timestamp for analyzing splitwise deployment

* up

* up

* up

* up

* up

* up

* fix format

* fix
This commit is contained in:
Juncai
2025-12-08 10:08:44 +08:00
committed by GitHub
parent 0c66163dfd
commit 80efe98f8d
21 changed files with 287 additions and 188 deletions

View File

@@ -362,7 +362,6 @@ class EngineService:
tasks.remove(tmp_task)
for item in tasks:
item.schedule_start_time = time.time()
trace_print(LoggingEventName.RESOURCE_ALLOCATE_START, item.request_id, getattr(item, "user", ""))
available_batch = np.sum(self.resource_manager.stop_flags)
if len(tasks) > available_batch:
@@ -400,7 +399,7 @@ class EngineService:
if not is_decode:
self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
for task in tasks:
task.inference_start_time = time.time()
task.metrics.inference_start_time = time.time()
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
@@ -415,7 +414,7 @@ class EngineService:
def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]):
"""
Decode insert prefilled requests into engine worker queue.
Used in v1_kvcache_scheduler.
Used in v0_kvcache_scheduler.
Args:
request_outputs: a list of RequestOutput sent by prefill instance
"""
@@ -437,6 +436,10 @@ class EngineService:
cur_req.prompt_token_ids[0] = req_out.outputs.token_ids[0]
cur_req.num_cached_tokens = req_out.num_cached_tokens
req_out.metrics.decode_recv_req_time = cur_req.metrics.decode_recv_req_time
req_out.metrics.decode_preallocate_req_time = cur_req.metrics.decode_preallocate_req_time
cur_req.metrics = req_out.metrics
cur_req.metrics.decode_inference_start_time = time.time()
if self.cfg.speculative_config.method in ["mtp"] and self.cfg.scheduler_config.splitwise_role == "decode":
cur_req.draft_token_ids = copy.deepcopy(req_out.outputs.draft_token_ids)
@@ -644,6 +647,7 @@ class EngineService:
batch=num_prefill_batch,
)
for task in tasks:
task.metrics.engine_get_req_time = time.time()
trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", ""))
if len(tasks) == 0:
time.sleep(0.001)
@@ -706,7 +710,7 @@ class EngineService:
batch=num_prefill_batch,
)
for task in tasks:
task.schedule_start_time = time.time()
task.metrics.engine_get_req_time = time.time()
trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", ""))
if self.cfg.scheduler_config.splitwise_role == "decode":
@@ -732,7 +736,10 @@ class EngineService:
# assure can allocate block ids in P
while not self.resource_manager.preallocate_resource_in_p(task):
time.sleep(0.005)
self.llm_logger.debug(f"P has allocated resources for request: {task.request_id}")
self.llm_logger.debug(
f"P has allocated resources and then ask D resource for request: {task.request_id}"
)
task.metrics.ask_decode_resource_start_time = time.time()
while True:
self.split_connector.send_splitwise_tasks([task], task.idx)
status, msg = self.split_connector.check_decode_allocated(task)
@@ -742,6 +749,7 @@ class EngineService:
)
time.sleep(0.05)
else:
task.metrics.ask_decode_resource_finish_time = time.time()
break
self.llm_logger.debug(f"D has allocated resource for request: {task.request_id}")
else:
@@ -749,32 +757,31 @@ class EngineService:
# assure can allocate block ids in P
while not self.resource_manager.preallocate_resource_in_p(task):
time.sleep(0.005)
self.llm_logger.debug(f"P has allocated resources for request: {task.request_id}")
self.llm_logger.debug(
f"P has allocated resources and then ask D resource for req_id: {task.request_id}"
)
task.metrics.ask_decode_resource_start_time = time.time()
self.split_connector.send_splitwise_tasks([task], task.idx)
for task in tasks:
if self.cfg.scheduler_config.splitwise_role != "mixed":
# assure fetch block ids from D
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(
f"D failed to allocate resource for request {task.request_id}, message: {msg}."
)
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
need_delete_tasks.append(task)
continue
else:
self.llm_logger.debug(f"D has allocated resource for request: {task.request_id}")
# assure fetch block ids from D
status, msg = self.split_connector.check_decode_allocated(task)
task.metrics.ask_decode_resource_finish_time = time.time()
if not status:
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
need_delete_tasks.append(task)
continue
for tmp_task in need_delete_tasks:
tasks.remove(tmp_task)
# release resource in P
@@ -822,6 +829,7 @@ class EngineService:
# Fetch requests and add them to the scheduling queue
if tasks:
for task in tasks:
task.metrics.add_req_to_resource_manager_time = time.time()
trace_print(
LoggingEventName.RESOURCE_ALLOCATE_START, task.request_id, getattr(task, "user", "")
)
@@ -895,6 +903,11 @@ class EngineService:
LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")
)
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
if isinstance(task, Request):
if self.cfg.scheduler_config.splitwise_role == "decode":
task.metrics.decode_inference_start_time = time.time()
else:
task.metrics.inference_start_time = time.time()
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
# 4. Response error tasks
@@ -962,7 +975,7 @@ class EngineService:
err_msg = None
try:
request = Request.from_dict(data)
request.llm_engine_recv_req_timestamp = time.time()
request.metrics.scheduler_recv_req_time = time.time()
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
main_process_metrics.requests_number.inc()
trace_print(LoggingEventName.PREPROCESSING_END, data["request_id"], data.get("user", ""))
@@ -1132,6 +1145,8 @@ class EngineService:
self.llm_logger.debug(
f"D has received tasks to preallocate resource for tasks: {[task.request_id for task in tasks]}"
)
for task in tasks:
task.metrics.decode_recv_req_time = time.time()
allocate_resource_requests.extend(tasks)
elif isinstance(tasks[0], RequestOutput):
self.llm_logger.debug(
@@ -1141,6 +1156,7 @@ class EngineService:
tasks = [tasks]
for task in tasks:
task.finished = False
task.metrics.decode_recv_first_token_time = time.time()
prefilled_request_ouputs.extend(tasks)
def _process_allocate_resource_requests():
@@ -1150,6 +1166,8 @@ class EngineService:
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
if self.resource_manager.preallocate_resource_in_d(task):
task.metrics.decode_preallocate_req_time = time.time()
self.llm_logger.info(f"Resource available, processing task {task.request_id}")
self.split_connector.send_cache_info_to_prefill([task])
self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}")
processed_indices.append(idx)
@@ -1158,6 +1176,7 @@ class EngineService:
if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len):
self.llm_logger.debug(f"D Resource available, processing task {task.request_id}")
self.insert_tasks([task])
task.metrics.decode_preallocate_req_time = time.time()
processed_indices.append(idx)
is_success = True