[Feature] Support mixed deployment with yiyan adapter (#3533)

* [Feature] Support mixed deployment with yiyan adapter

* [Feature] Support mixed deployment with yiyan adapter

* fix merge

---------

Co-authored-by: YuBaoku <49938469+EmmonsCurse@users.noreply.github.com>
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
chenjian
2025-08-23 09:56:47 +08:00
committed by GitHub
parent 8b9f167ccc
commit e8af92aab7
8 changed files with 494 additions and 175 deletions

View File

@@ -47,12 +47,14 @@ from fastdeploy.inter_communicator import (
EngineCacheQueue,
EngineWorkerQueue,
IPCSignal,
ZmqClient,
ZmqIpcServer,
ZmqTcpServer,
)
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.metrics.trace_util import start_span, start_span_request
from fastdeploy.model_executor.guided_decoding import schema_checker
from fastdeploy.output.token_processor import TokenProcessor, WarmUpTokenProcessor
from fastdeploy.splitwise.internal_adapter_utils import InternalAdapter
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
from fastdeploy.utils import EngineError, console_logger, envs, llm_logger
@@ -181,9 +183,19 @@ class LLMEngine:
self.data_processor = self.input_processor.create_processor()
if api_server_pid is not None:
self.zmq_server = ZmqClient(name=api_server_pid, mode=zmq.PULL)
self.zmq_server.start_server()
self.zmq_server.create_router()
if envs.FD_ENABLE_INTERNAL_ADAPTER:
self.recv_request_server = ZmqTcpServer(port=envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, mode=zmq.PULL)
self.send_response_server = ZmqTcpServer(port=envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT, mode=zmq.ROUTER)
self.external_adapter = InternalAdapter(
cfg=self.cfg, engine=self, dp_rank=self.cfg.node_rank * self.cfg.worker_num_per_node
)
else:
self.recv_request_server = ZmqIpcServer(name=api_server_pid, mode=zmq.PULL)
self.send_response_server = ZmqIpcServer(name=api_server_pid, mode=zmq.ROUTER)
self.recv_result_handle_thread = threading.Thread(
target=self.send_response_server.recv_result_handle, daemon=True
)
self.recv_result_handle_thread.start()
time.sleep(3)
if self.do_profile == 0 and (
@@ -293,7 +305,7 @@ class LLMEngine:
time.sleep(0.005)
continue
for request_id, contents in results.items():
self.zmq_server.send_multipart(request_id, contents)
self.send_response_server.send_response(request_id, contents)
except Exception as e:
llm_logger.error(f"Unexcepted error happend: {e}, {traceback.format_exc()!s}")
@@ -422,9 +434,9 @@ class LLMEngine:
try:
block = True if len(added_requests) == 0 else False
if not self.cfg.enable_mm:
err, data = self.zmq_server.receive_json_once(block)
err, data = self.recv_request_server.receive_json_once(block)
else:
err, data = self.zmq_server.receive_pyobj_once(block)
err, data = self.recv_request_server.receive_pyobj_once(block)
if err is not None:
llm_logger.error("Engine stops inserting zmq task into scheduler, err:{err}")
break
@@ -472,7 +484,7 @@ class LLMEngine:
)
# Since the request is not in scheduler
# Send result by zmq directly
self.zmq_server.send_multipart(request_id, error_result)
self.send_response_server.send_response(request_id, [error_result])
except Exception as e:
llm_logger.error(
f"Error happend while receving new request from zmq, details={e}, "
@@ -1009,8 +1021,12 @@ class LLMEngine:
print(f"Error extracting sub services: {e}")
self.engine_worker_queue.cleanup()
if hasattr(self, "zmq_server") and self.zmq_server is not None:
self.zmq_server.close()
if hasattr(self, "send_response_server") and self.send_response_server is not None:
self.send_response_server.close()
if hasattr(self, "recv_request_server") and self.recv_request_server is not None:
self.recv_request_server.close()
if hasattr(self, "recv_control_cmd_server") and self.recv_control_cmd_server is not None:
self.recv_control_cmd_server.close()
if hasattr(self, "dp_processed"):
for p in self.dp_processed:
p.join()