Files
FastDeploy/fastdeploy/scheduler/data.py
2025-06-29 23:29:37 +00:00

217 lines
6.3 KiB
Python

"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from datetime import datetime
import time
import json
from fastdeploy.engine.request import Request, RequestOutput
class ScheduledRequest(object):
"""
A wrapper class for Request objects with scheduling metadata.
This class extends Request objects with:
- Queue information for distributed scheduling
- Timestamp tracking
- Serialization capabilities
"""
def __init__(self,
request: Request,
request_queue_name: str = "",
response_queue_name: str = ""):
"""
Initialize a ScheduledRequest instance.
Args:
request: The original Request object
request_queue_name: Name of the request queue
response_queue_name: Name of the response queue
"""
self.raw: Request = request
self.request_queue_name = request_queue_name
self.response_queue_name = response_queue_name
self.schedule_time = time.time()
def __repr__(self) -> str:
local_time = datetime.fromtimestamp(self.schedule_time)
formatted_time = local_time.strftime(
"%Y-%m-%d %H:%M:%S") + f"{local_time.microsecond // 1000:03d}"
return (f"request_id:{self.request_id} request_queue:{self.request_queue_name} "
f"response_queue:{self.response_queue_name} "
f"schedule_time:{formatted_time}")
@property
def request_id(self) -> str:
"""
Get the request ID.
Returns:
The unique request identifier
"""
return self.raw.request_id
@request_id.setter
def request_id(self, id: str):
"""
Set the request ID.
Args:
id: New request identifier
"""
self.raw.request_id = id
@property
def prompt_tokens_ids_len(self) -> int:
"""
Get the length of prompt token IDs.
Returns:
Number of tokens in the prompt
"""
return len(self.raw.prompt_token_ids)
def serialize(self) -> bytes:
"""
Serialize the request to bytes for storage/transmission.
Returns:
Serialized request data as bytes
"""
data = {
"request_queue_name": self.request_queue_name,
"response_queue_name": self.response_queue_name,
"schedule_time": self.schedule_time,
"raw": self.raw.to_dict(),
}
serialized_data = json.dumps(data, ensure_ascii=False)
return serialized_data.encode()
@classmethod
def unserialize(cls, serialized_data: bytes) -> 'ScheduledRequest':
"""
Deserialize bytes back into a ScheduledRequest.
Args:
serialized_data: Serialized request data
Returns:
Reconstructed ScheduledRequest object
"""
data = json.loads(serialized_data)
request = Request.from_dict(data["raw"])
scheduled_request = cls(request)
scheduled_request.schedule_time = data["schedule_time"]
scheduled_request.request_queue_name = data["request_queue_name"]
scheduled_request.response_queue_name = data["response_queue_name"]
return scheduled_request
class ScheduledResponse(object):
"""
A wrapper class for RequestOutput objects with scheduling metadata.
This class extends RequestOutput objects with:
- Timestamp tracking
- Serialization capabilities
- Status checking
"""
def __init__(self, response: RequestOutput):
"""
Initialize a ScheduledResponse instance.
Args:
response: The original RequestOutput object
"""
self.raw: RequestOutput = response
self.schedule_time = time.time()
def __repr__(self):
return f"request_id:{self.request_id} index:{self.index} finished:{self.finished}"
@property
def request_id(self) -> str:
"""
Get the request ID.
Returns:
The unique request identifier
"""
return self.raw.request_id
@request_id.setter
def request_id(self, id: str):
"""
Set the request ID.
Args:
id: New request identifier
"""
self.raw.request_id = id
@property
def index(self) -> int:
"""
Get the output index.
Returns:
Position index of this response in the sequence
"""
return self.raw.outputs.index
@property
def finished(self) -> bool:
"""
Check if the request is complete.
Returns:
True if this is the final response for the request
"""
return self.raw.finished
def serialize(self) -> bytes:
"""
Serialize the response to bytes for storage/transmission.
Returns:
Serialized response data as bytes
"""
data = {
"schedule_time": self.schedule_time,
"raw": self.raw.to_dict(),
}
serialized_data = json.dumps(data, ensure_ascii=False)
return serialized_data.encode()
@classmethod
def unserialize(cls, serialized_data: bytes) -> 'ScheduledResponse':
"""
Deserialize bytes back into a ScheduledResponse.
Args:
serialized_data: Serialized response data
Returns:
Reconstructed ScheduledResponse object
"""
data = json.loads(serialized_data)
response = RequestOutput.from_dict(data["raw"])
scheduled_response = cls(response)
scheduled_response.schedule_time = data["schedule_time"]
return scheduled_response