merge 2.0.2 into 2.0.3 (#2917)

Co-authored-by: shige <shige@baidu.com>
This commit is contained in:
sg263
2025-07-22 14:46:20 +08:00
committed by GitHub
parent 4dbc483713
commit 580460046f
5 changed files with 107 additions and 34 deletions

View File

@@ -58,7 +58,6 @@ class LLMEngine(object):
Attributes:
cfg (Config): Configuration object containing all the parameters.
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
scheduler (LocalScheduler or GlobalScheduler): Scheduling tasks.
input_processor (InputPreprocessor): Preprocessor for input data.
resource_manager (ResourceManager): Manager for resource allocation.

View File

@@ -24,7 +24,7 @@ import zmq
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from prometheus_client import CONTENT_TYPE_LATEST
from fastdeploy.metrics.trace_util import inject_to_metadata
from fastdeploy.metrics.trace_util import inject_to_metadata,instrument
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.engine import LLMEngine
@@ -141,6 +141,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
instrument(app)
# TODO 传递真实引擎值 通过pid 获取状态
@@ -403,11 +404,6 @@ def launch_controller_server():
f"The parameter `controller_port`:{args.controller_port} is already in use."
)
if not is_port_available(args.host, args.controller_port):
raise Exception(
f"The parameter `controller_port`:{args.controller_port} is already in use."
)
controller_server_thread = threading.Thread(target=run_controller_server,
daemon=True)
controller_server_thread.start()

View File

@@ -105,6 +105,30 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Whether to use aggregate send.
"FD_USE_AGGREGATE_SEND":
lambda: bool(int(os.getenv("FD_USE_AGGREGATE_SEND", "0"))),
# Whether to open Trace.
"TRACES_ENABLE":
lambda: os.getenv("TRACES_ENABLE", "false"),
# set traec Server name.
"FD_SERVICE_NAME":
lambda: os.getenv("FD_SERVICE_NAME", "FastDeploy"),
# set traec host name.
"FD_HOST_NAME":
lambda: os.getenv("FD_HOST_NAME", "localhost"),
# set traec exporter.
"TRACES_EXPORTER":
lambda: os.getenv("TRACES_EXPORTER", "console"),
# set traec exporter_otlp_endpoint.
"EXPORTER_OTLP_ENDPOINT":
lambda: os.getenv("EXPORTER_OTLP_ENDPOINT"),
# set traec exporter_otlp_headers.
"EXPORTER_OTLP_HEADERS":
lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
}

View File

@@ -1,15 +1,83 @@
from opentelemetry.propagate import inject, extract
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from fastapi import FastAPI
from fastdeploy.utils import (llm_logger)
from fastdeploy import envs
import json
import os
# create global OpenTelemetry tracer
tracer = trace.get_tracer(__name__)
# OpenTelemetry Trace context store in metadata
TRACE_CARRIER = "trace_carrier"
traces_enable = False
tracer = trace.get_tracer(__name__)
def set_up():
try:
# when TRACES_ENABLED=true start trace
global traces_enable
traces_enable = envs.TRACES_ENABLE.lower() == "true"
if not traces_enable:
llm_logger.warning("Opentelemetry is DISABLED.")
return
llm_logger.info("Opentelemetry is ENABLED, configuring...")
# --- read env ---
service_name = envs.FD_SERVICE_NAME
host_name = envs.FD_HOST_NAME
# --- set attributes (Service Name, Host Name, etc.) ---
resource_attributes = {
"service.name": service_name
}
if host_name:
resource_attributes["host.name"] = host_name
resource = Resource(attributes=resource_attributes)
# --- set Exporter ---
exporter_type = envs.TRACES_EXPORTER.lower()
if exporter_type == "otlp":
endpoint = envs.EXPORTER_OTLP_ENDPOINT # should be set
headers = envs.EXPORTER_OTLP_HEADERS # e.g., "Authentication=***,k2=v2"
otlp_exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=dict(item.split("=") for item in headers.split(",")) if headers else None
)
processor = BatchSpanProcessor(otlp_exporter)
llm_logger.info(f"Using OTLP Exporter, sending to {endpoint} with headers {headers}")
else: # default console
processor = BatchSpanProcessor(ConsoleSpanExporter())
llm_logger.info("Using Console Exporter.")
# --- set Tracer Provider ---
provider = TracerProvider(resource=resource)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
global tracer
tracer = trace.get_tracer(__name__)
except:
llm_logger.error("set_up failed")
pass
def instrument(app: FastAPI):
try:
set_up()
if traces_enable:
llm_logger.info("Applying instrumentors...")
FastAPIInstrumentor.instrument_app(app)
except:
llm_logger.info("instrument failed")
pass
def inject_to_metadata(request, metadata_attr='metadata'):
"""
Inject OpenTelemetry trace context into the metadata field of the request.
@@ -28,9 +96,7 @@ def inject_to_metadata(request, metadata_attr='metadata'):
- If there is no metadata attribute in the request, an empty dict will be created for it as its attribute
"""
try:
if request is None:
return
if is_opentelemetry_instrumented() == False:
if request is None or traces_enable == False:
return
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
@@ -48,6 +114,7 @@ def inject_to_metadata(request, metadata_attr='metadata'):
except:
pass
def extract_from_metadata(request, metadata_attr='metadata'):
"""
Extract trace context from metadata of request object (dict or class instance).
@@ -100,12 +167,13 @@ def extract_from_request(request):
except:
return None
def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
if not traces_enable:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_metadata(request)
@@ -114,12 +182,13 @@ def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
except:
pass
def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
if not traces_enable:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_request(request)
@@ -127,18 +196,3 @@ def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
pass
except:
pass
def is_opentelemetry_instrumented() -> bool:
"""
check OpenTelemetry is start or not
"""
try:
return (
os.getenv("OTEL_PYTHONE_DISABLED_INSTRUMENTATIONS") is not None
or os.getenv("OTEL_SERVICE_NAME") is not None
or os.getenv("OTEL_TRACES_EXPORTER") is not None
)
except Exception:
return False

View File

@@ -36,4 +36,4 @@ opentelemetry-instrumentation-redis
opentelemetry-instrumentation-mysql
opentelemetry-distro 
opentelemetry-exporter-otlp
opentelemetry-instrumentation-fastapi