Feature:Add support for Pooling Model Embedding and provide an OpenAI-compatible API. (#4344)

* feat: add OpenAIServing

* feat: add ZmqOpenAIServing & OpenAIServingEmbedding

* feat: Refine the basic ServingEngine class and introduce ServingContext

* fix: codestyle

* fix: request

* fix: pooling_params

* feat: _process_chat_template_kwargs

* feat: support batch request

* feat: pooling_params verify & default parameters

---------

Co-authored-by: sunlei1024 <sunlei1024@example.com>
This commit is contained in:
SunLei
2025-10-15 19:42:59 +08:00
committed by GitHub
parent 744287e1a9
commit b4b579a7ed
12 changed files with 971 additions and 55 deletions

View File

@@ -43,12 +43,14 @@ from fastdeploy.entrypoints.openai.protocol import (
CompletionRequest,
CompletionResponse,
ControlSchedulerRequest,
EmbeddingRequest,
ErrorInfo,
ErrorResponse,
ModelList,
)
from fastdeploy.entrypoints.openai.serving_chat import OpenAIServingChat
from fastdeploy.entrypoints.openai.serving_completion import OpenAIServingCompletion
from fastdeploy.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
from fastdeploy.entrypoints.openai.serving_models import ModelPath, OpenAIServingModels
from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager
from fastdeploy.entrypoints.openai.utils import UVICORN_CONFIG, make_arg_parser
@@ -86,6 +88,9 @@ if args.tool_parser_plugin:
ToolParserManager.import_tool_parser(args.tool_parser_plugin)
llm_engine = None
MAX_CONCURRENT_CONNECTIONS = (args.max_concurrency + args.workers - 1) // args.workers
connection_semaphore = StatefulSemaphore(MAX_CONCURRENT_CONNECTIONS)
class StandaloneApplication(BaseApplication):
def __init__(self, app, options=None):
@@ -121,12 +126,6 @@ def load_engine():
return engine
app = FastAPI()
MAX_CONCURRENT_CONNECTIONS = (args.max_concurrency + args.workers - 1) // args.workers
connection_semaphore = StatefulSemaphore(MAX_CONCURRENT_CONNECTIONS)
def load_data_service():
"""
load data service
@@ -220,11 +219,24 @@ async def lifespan(app: FastAPI):
args.ips,
args.max_waiting_time,
)
engine_args = EngineArgs.from_cli_args(args)
config = engine_args.create_engine_config(port_availability_check=False)
embedding_handler = OpenAIServingEmbedding(
engine_client,
app.state.model_handler,
config,
pid,
args.ips,
args.max_waiting_time,
chat_template,
)
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
engine_client.pid = pid
app.state.engine_client = engine_client
app.state.chat_handler = chat_handler
app.state.completion_handler = completion_handler
app.state.embedding_handler = embedding_handler
global llm_engine
if llm_engine is not None:
llm_engine.engine.data_processor = engine_client.data_processor
@@ -434,6 +446,20 @@ async def list_models() -> Response:
return JSONResponse(content=models.model_dump())
@app.post("/v1/embeddings")
async def create_embedding(request: EmbeddingRequest):
"""
Create embeddings for the input texts
"""
if app.state.dynamic_load_weight:
status, msg = app.state.engine_client.is_workers_alive()
if not status:
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
generator = await app.state.embedding_handler.create_embedding(request)
return JSONResponse(content=generator.model_dump())
@app.get("/update_model_weight")
def update_model_weight(request: Request) -> Response:
"""