[Trace] add opentelemetry (#2852)

* add opentelemetry

* add opentelemetry

* add opentelemetry on dequeue

* add opentelemetry on dequeue

* add opentelemetry on dequeue

---------

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
sg263
2025-07-16 15:33:25 +08:00
committed by GitHub
parent dda4a9f848
commit 42b80182e0
6 changed files with 181 additions and 5 deletions

View File

@@ -47,7 +47,8 @@ from fastdeploy.output.token_processor import (TokenProcessor,
WarmUpTokenProcessor)
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
from fastdeploy.utils import EngineError, console_logger, llm_logger
from fastdeploy.metrics.trace_util import extract_from_metadata, start_span, start_span_request
from opentelemetry import trace
class LLMEngine(object):
"""
@@ -56,6 +57,7 @@ class LLMEngine(object):
Attributes:
cfg (Config): Configuration object containing all the parameters.
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
scheduler (LocalScheduler or GlobalScheduler): Scheduling tasks.
input_processor (InputPreprocessor): Preprocessor for input data.
resource_manager (ResourceManager): Manager for resource allocation.
@@ -356,6 +358,9 @@ class LLMEngine(object):
results: List[Tuple[str, Optional[str]]] = list()
if data:
request = Request.from_dict(data)
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
llm_logger.debug(f"Receive request: {request}")
err_msg = None
@@ -686,6 +691,8 @@ class LLMEngine(object):
"""
Insert tasks to engine.
"""
for task in tasks:
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
# TODO 返回至 scheduler
if allocated:
current_tasks = []

View File

@@ -55,7 +55,8 @@ class Request:
guided_grammar: Optional[Any] = None,
structural_tag: Optional[Any] = None,
guided_json_object: Optional[bool] = None,
enable_thinking: Optional[bool] = True) -> None:
enable_thinking: Optional[bool] = True,
trace_carrier: dict = dict()) -> None:
self.request_id = request_id
self.prompt = prompt
self.prompt_token_ids = prompt_token_ids
@@ -91,6 +92,7 @@ class Request:
self.multimodal_data = multimodal_data
self.enable_thinking = enable_thinking
self.trace_carrier = trace_carrier
@classmethod
def from_dict(cls, d: dict):
@@ -120,7 +122,8 @@ class Request:
guided_grammar=d.get("guided_grammar", None),
structural_tag=d.get("structural_tag", None),
guided_json_object=d.get("guided_json_object", None),
enable_thinking=d.get("enable_thinking", True))
enable_thinking=d.get("enable_thinking", True),
trace_carrier=d.get("trace_carrier", {}))
def to_dict(self) -> dict:
"""convert Request into a serializable dict """
@@ -142,7 +145,8 @@ class Request:
"raw_request": self.raw_request,
"disaggregate_info": self.disaggregate_info,
"draft_token_ids": self.draft_token_ids,
"enable_thinking": self.enable_thinking
"enable_thinking": self.enable_thinking,
"trace_carrier": self.trace_carrier
}
add_params = [
"guided_json", "guided_regex", "guided_choice", "guided_grammar",

View File

@@ -24,6 +24,7 @@ import zmq
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from prometheus_client import CONTENT_TYPE_LATEST
from fastdeploy.metrics.trace_util import inject_to_metadata
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.engine import LLMEngine
@@ -210,6 +211,7 @@ async def create_chat_completion(request: ChatCompletionRequest):
return JSONResponse(
content={"error": "Worker Service Not Healthy"},
status_code=304)
inject_to_metadata(request)
generator = await app.state.chat_handler.create_chat_completion(request)
if isinstance(generator, ErrorResponse):

View File

@@ -0,0 +1,144 @@
from opentelemetry.propagate import inject, extract
from opentelemetry import trace
import json
import os
# create global OpenTelemetry tracer
tracer = trace.get_tracer(__name__)
# OpenTelemetry Trace context store in metadata
TRACE_CARRIER = "trace_carrier"
def inject_to_metadata(request, metadata_attr='metadata'):
"""
Inject OpenTelemetry trace context into the metadata field of the request.
Parameters:
request: can be a dict or object, with metadata attributes or fields.
metadata_attr: the field name of metadata, default is 'metadata'.
Operation:
- If metadata does not exist, create a new one and mount it on the request.
- Inject the current trace context as a JSON string and store it in metadata.
- Use the key TRACE_CARRIER to store the injected content.
Note:
- This function is a non-blocking operation, and errors are silently ignored.
- If there is no metadata attribute in the request, an empty dict will be created for it as its attribute
"""
try:
if request is None:
return
if is_opentelemetry_instrumented() == False:
return
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
if metadata is None:
metadata = {}
if isinstance(request, dict):
request[metadata_attr] = metadata
else:
setattr(request, metadata_attr, metadata)
trace_carrier = {}
inject(trace_carrier)
trace_carrier_json_string = json.dumps(trace_carrier)
metadata[TRACE_CARRIER] = trace_carrier_json_string
except:
pass
def extract_from_metadata(request, metadata_attr='metadata'):
"""
Extract trace context from metadata of request object (dict or class instance).
Parameters:
request: can be a dictionary or any object, containing metadata attributes or fields.
metadata_attr: metadata field name, default is 'metadata'.
Returns:
- Extraction success: returns OpenTelemetry context object (Context)
- Extraction failure or exception: returns None
"""
try:
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
if metadata is None:
return None
trace_carrier_json_string = metadata.get(TRACE_CARRIER)
if trace_carrier_json_string is None:
return None
trace_carrier = json.loads(trace_carrier_json_string)
ctx = extract(trace_carrier)
return ctx
except:
return None
def extract_from_request(request):
"""
Extract trace context from trace_carrier of request object (dict or class instance).
Parameters:
request: can be a dictionary or any object, containing metadata attributes or fields.
metadata_attr: metadata field name, default is 'metadata'.
Returns:
- Extraction success: returns OpenTelemetry context object (Context)
- Extraction failure or exception: returns None
"""
try:
trace_carrier_info = getattr(request, TRACE_CARRIER, None)
if trace_carrier_info is None:
return None
trace_carrier = json.loads(trace_carrier_info)
ctx = extract(trace_carrier)
return ctx
except:
return None
def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_metadata(request)
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
pass
except:
pass
def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_request(request)
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
pass
except:
pass
def is_opentelemetry_instrumented() -> bool:
"""
check OpenTelemetry is start or not
"""
try:
return (
os.getenv("OTEL_PYTHONE_DISABLED_INSTRUMENTATIONS") is not None
or os.getenv("OTEL_SERVICE_NAME") is not None
or os.getenv("OTEL_TRACES_EXPORTER") is not None
)
except Exception:
return False

View File

@@ -29,3 +29,9 @@ triton==3.3
use-triton-in-paddle
crcmod
fastsafetensors==0.1.14
opentelemetry-api>=1.24.0
opentelemetry-sdk>=1.24.0
opentelemetry-instrumentation-redis
opentelemetry-instrumentation-mysql
opentelemetry-distro 
opentelemetry-exporter-otlp

View File

@@ -19,6 +19,9 @@ import re
import sys
import paddle
import subprocess
from setuptools import setup
from setuptools.command.install import install
from pathlib import Path
from pathlib import Path
from setuptools import Extension, find_packages, setup
from setuptools.command.build_ext import build_ext
@@ -140,6 +143,14 @@ class CMakeBuild(build_ext):
cwd=build_temp,
check=True)
class PostInstallCommand(install):
"""在标准安装完成后执行自定义命令"""
def run(self):
# 先执行标准安装步骤
install.run(self)
# 执行自定义命令
subprocess.check_call(["opentelemetry-bootstrap", "-a", "install"])
subprocess.check_call(["pip", "install", "opentelemetry-instrumentation-fastapi"])
def load_requirements():
"""Load dependencies from requirements.txt"""
@@ -183,6 +194,7 @@ def get_name():
cmdclass_dict = {'bdist_wheel': CustomBdistWheel}
cmdclass_dict['build_ext'] = CMakeBuild
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.0-dev")
cmdclass_dict['build_optl'] = PostInstallCommand
setup(
name=get_name(),
@@ -226,3 +238,4 @@ setup(
python_requires=">=3.7",
extras_require={"test": ["pytest>=6.0"]},
)