diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index ef9a9881e..dd82470fc 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -24,7 +24,7 @@ import zmq from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, Response, StreamingResponse from prometheus_client import CONTENT_TYPE_LATEST -from fastdeploy.metrics.trace_util import inject_to_metadata +from fastdeploy.metrics.trace_util import inject_to_metadata,instrument from fastdeploy.engine.args_utils import EngineArgs from fastdeploy.engine.engine import LLMEngine @@ -46,6 +46,7 @@ from fastdeploy.utils import (FlexibleArgumentParser, api_server_logger, console_logger, is_port_available, retrive_model_from_server) + parser = FlexibleArgumentParser() parser.add_argument("--port", default=8000, @@ -141,6 +142,7 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) +instrument(app) # TODO 传递真实引擎值 通过pid 获取状态 diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index b27beb990..ad13b0831 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -74,7 +74,7 @@ environment_variables: dict[str, Callable[[], Any]] = { "FD_ATTENTION_BACKEND": lambda: os.getenv("FD_ATTENTION_BACKEND", "APPEND_ATTN"), - # Set sampling class. "base", "base_non_truncated", "air" and "rejection" can be set currently. + # Set sampling class. "base", "base_non_truncated", "air" and "rejection" can be set currently. "FD_SAMPLING_CLASS": lambda: os.getenv("FD_SAMPLING_CLASS", "base"), @@ -97,6 +97,30 @@ environment_variables: dict[str, Callable[[], Any]] = { # Whether to use fastsafetensor load weight (0 or 1) "FD_USE_FASTSAFETENSOR": lambda: os.getenv("FD_USE_FASTSAFETENSOR", "0"), + + # Whether to open Trace. + "TRACES_ENABLE": + lambda: os.getenv("TRACES_ENABLE", "false"), + + # set traec Server name. + "FD_SERVICE_NAME": + lambda: os.getenv("FD_SERVICE_NAME", "FastDeploy"), + + # set traec host name. + "FD_HOST_NAME": + lambda: os.getenv("FD_HOST_NAME", "localhost"), + + # set traec exporter. + "TRACES_EXPORTER": + lambda: os.getenv("TRACES_EXPORTER", "console"), + + # set traec exporter_otlp_endpoint. + "EXPORTER_OTLP_ENDPOINT": + lambda: os.getenv("EXPORTER_OTLP_ENDPOINT"), + + # set traec exporter_otlp_headers. + "EXPORTER_OTLP_HEADERS": + lambda: os.getenv("EXPORTER_OTLP_HEADERS"), } diff --git a/fastdeploy/metrics/trace_util.py b/fastdeploy/metrics/trace_util.py index 2ed6fd812..576e284a2 100644 --- a/fastdeploy/metrics/trace_util.py +++ b/fastdeploy/metrics/trace_util.py @@ -1,15 +1,83 @@ from opentelemetry.propagate import inject, extract from opentelemetry import trace - +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace.export import ConsoleSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from fastapi import FastAPI +from fastdeploy.utils import (llm_logger) +from fastdeploy import envs import json -import os -# create global OpenTelemetry tracer -tracer = trace.get_tracer(__name__) # OpenTelemetry Trace context store in metadata TRACE_CARRIER = "trace_carrier" +traces_enable = False +tracer = trace.get_tracer(__name__) + +def set_up(): + try: + # when TRACES_ENABLED=true start trace + global traces_enable + traces_enable = envs.TRACES_ENABLE.lower() == "true" + if not traces_enable: + llm_logger.warning("Opentelemetry is DISABLED.") + return + + llm_logger.info("Opentelemetry is ENABLED, configuring...") + # --- read env --- + service_name = envs.FD_SERVICE_NAME + host_name = envs.FD_HOST_NAME + # --- set attributes (Service Name, Host Name, etc.) --- + resource_attributes = { + "service.name": service_name + } + if host_name: + resource_attributes["host.name"] = host_name + + resource = Resource(attributes=resource_attributes) + + # --- set Exporter --- + exporter_type = envs.TRACES_EXPORTER.lower() + if exporter_type == "otlp": + endpoint = envs.EXPORTER_OTLP_ENDPOINT # should be set + headers = envs.EXPORTER_OTLP_HEADERS # e.g., "Authentication=***,k2=v2" + + otlp_exporter = OTLPSpanExporter( + endpoint=endpoint, + headers=dict(item.split("=") for item in headers.split(",")) if headers else None + ) + processor = BatchSpanProcessor(otlp_exporter) + llm_logger.info(f"Using OTLP Exporter, sending to {endpoint} with headers {headers}") + else: # default console + processor = BatchSpanProcessor(ConsoleSpanExporter()) + llm_logger.info("Using Console Exporter.") + + # --- set Tracer Provider --- + provider = TracerProvider(resource=resource) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + global tracer + tracer = trace.get_tracer(__name__) + except: + llm_logger.error("set_up failed") + pass + +def instrument(app: FastAPI): + try: + set_up() + if traces_enable: + llm_logger.info("Applying instrumentors...") + FastAPIInstrumentor.instrument_app(app) + except: + llm_logger.info("instrument failed") + pass + + + def inject_to_metadata(request, metadata_attr='metadata'): """ Inject OpenTelemetry trace context into the metadata field of the request. @@ -28,9 +96,7 @@ def inject_to_metadata(request, metadata_attr='metadata'): - If there is no metadata attribute in the request, an empty dict will be created for it as its attribute """ try: - if request is None: - return - if is_opentelemetry_instrumented() == False: + if request is None or traces_enable == False: return metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None) @@ -48,6 +114,7 @@ def inject_to_metadata(request, metadata_attr='metadata'): except: pass + def extract_from_metadata(request, metadata_attr='metadata'): """ Extract trace context from metadata of request object (dict or class instance). @@ -74,7 +141,7 @@ def extract_from_metadata(request, metadata_attr='metadata'): return ctx except: return None - + def extract_from_request(request): """ @@ -100,12 +167,13 @@ def extract_from_request(request): except: return None + def start_span(span_name, request, kind=trace.SpanKind.CLIENT): """ just start a new span in request trace context """ try: - if is_opentelemetry_instrumented() == False: + if not traces_enable: return # extract Trace context from request.metadata.trace_carrier ctx = extract_from_metadata(request) @@ -114,31 +182,17 @@ def start_span(span_name, request, kind=trace.SpanKind.CLIENT): except: pass + def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT): """ just start a new span in request trace context """ try: - if is_opentelemetry_instrumented() == False: + if not traces_enable: return # extract Trace context from request.metadata.trace_carrier ctx = extract_from_request(request) with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span: pass except: - pass - -def is_opentelemetry_instrumented() -> bool: - """ - check OpenTelemetry is start or not - """ - try: - return ( - os.getenv("OTEL_PYTHONE_DISABLED_INSTRUMENTATIONS") is not None - or os.getenv("OTEL_SERVICE_NAME") is not None - or os.getenv("OTEL_TRACES_EXPORTER") is not None - ) - except Exception: - return False - - + pass \ No newline at end of file