mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
[BugFix] fix workers=1 (#4364)
* [Feature] support prefix cache in DP * fix * Update common_engine.py * Update common_engine.py * Update common_engine.py * Update common_engine.py * [BugFix] fix workers more than 1 * fix * Update api_server.py * fix * Update api_server.py * fix --------- Co-authored-by: ltd0924 <luotingdan@baidu.com>
This commit is contained in:
@@ -22,13 +22,13 @@ import time
|
||||
import traceback
|
||||
from collections.abc import AsyncGenerator
|
||||
from contextlib import asynccontextmanager
|
||||
from multiprocessing import current_process
|
||||
|
||||
import uvicorn
|
||||
import zmq
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from fastapi.responses import JSONResponse, Response, StreamingResponse
|
||||
from gunicorn.app.base import BaseApplication
|
||||
from opentelemetry import trace
|
||||
from prometheus_client import CONTENT_TYPE_LATEST
|
||||
|
||||
@@ -87,6 +87,21 @@ if args.tool_parser_plugin:
|
||||
llm_engine = None
|
||||
|
||||
|
||||
class StandaloneApplication(BaseApplication):
|
||||
def __init__(self, app, options=None):
|
||||
self.application = app
|
||||
self.options = options or {}
|
||||
super().__init__()
|
||||
|
||||
def load_config(self):
|
||||
config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None}
|
||||
for key, value in config.items():
|
||||
self.cfg.set(key.lower(), value)
|
||||
|
||||
def load(self):
|
||||
return self.application
|
||||
|
||||
|
||||
def load_engine():
|
||||
"""
|
||||
load engine
|
||||
@@ -95,10 +110,10 @@ def load_engine():
|
||||
if llm_engine is not None:
|
||||
return llm_engine
|
||||
|
||||
api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}")
|
||||
api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}, port: {args.port}")
|
||||
engine_args = EngineArgs.from_cli_args(args)
|
||||
engine = LLMEngine.from_engine_args(engine_args)
|
||||
if not engine.start(api_server_pid=os.getpid()):
|
||||
if not engine.start(api_server_pid=args.port):
|
||||
api_server_logger.error("Failed to initialize FastDeploy LLM engine, service exit now!")
|
||||
return None
|
||||
|
||||
@@ -119,12 +134,12 @@ def load_data_service():
|
||||
global llm_engine
|
||||
if llm_engine is not None:
|
||||
return llm_engine
|
||||
api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}")
|
||||
api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}, port: {args.port}")
|
||||
engine_args = EngineArgs.from_cli_args(args)
|
||||
config = engine_args.create_engine_config()
|
||||
api_server_logger.info(f"local_data_parallel_id: {config.parallel_config}")
|
||||
expert_service = ExpertService(config, config.parallel_config.local_data_parallel_id)
|
||||
if not expert_service.start(os.getpid(), config.parallel_config.local_data_parallel_id):
|
||||
if not expert_service.start(args.port, config.parallel_config.local_data_parallel_id):
|
||||
api_server_logger.error("Failed to initialize FastDeploy LLM expert service, service exit now!")
|
||||
return None
|
||||
llm_engine = expert_service
|
||||
@@ -136,13 +151,22 @@ async def lifespan(app: FastAPI):
|
||||
"""
|
||||
async context manager for FastAPI lifespan
|
||||
"""
|
||||
import logging
|
||||
|
||||
uvicorn_access = logging.getLogger("uvicorn.access")
|
||||
uvicorn_access.handlers.clear()
|
||||
|
||||
# 使用 gunicorn 的格式
|
||||
formatter = logging.Formatter("[%(asctime)s] [%(process)d] [INFO] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
||||
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
uvicorn_access.addHandler(handler)
|
||||
uvicorn_access.propagate = False
|
||||
|
||||
if args.tokenizer is None:
|
||||
args.tokenizer = args.model
|
||||
if current_process().name != "MainProcess":
|
||||
pid = os.getppid()
|
||||
else:
|
||||
pid = os.getpid()
|
||||
pid = args.port
|
||||
api_server_logger.info(f"{pid}")
|
||||
|
||||
if args.served_model_name is not None:
|
||||
@@ -449,16 +473,17 @@ def launch_api_server() -> None:
|
||||
api_server_logger.info(f"args: {args.__dict__}")
|
||||
fd_start_span("FD_START")
|
||||
|
||||
options = {
|
||||
"bind": f"{args.host}:{args.port}",
|
||||
"workers": args.workers,
|
||||
"worker_class": "uvicorn.workers.UvicornWorker",
|
||||
"loglevel": "info",
|
||||
"log_config": UVICORN_CONFIG,
|
||||
"timeout_graceful_shutdown": args.timeout_graceful_shutdown,
|
||||
}
|
||||
|
||||
try:
|
||||
uvicorn.run(
|
||||
app="fastdeploy.entrypoints.openai.api_server:app",
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
workers=args.workers,
|
||||
log_config=UVICORN_CONFIG,
|
||||
log_level="info",
|
||||
timeout_graceful_shutdown=args.timeout_graceful_shutdown,
|
||||
) # set log level to error to avoid log
|
||||
StandaloneApplication(app, options).run()
|
||||
except Exception as e:
|
||||
api_server_logger.error(f"launch sync http server error, {e}, {str(traceback.format_exc())}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user