[Trace] Support trace log (#2864)

* add opentelemetry

* add opentelemetry

* add opentelemetry on dequeue

* add opentelemetry on dequeue

* add opentelemetry on dequeue
This commit is contained in:
sg263
2025-07-16 15:35:44 +08:00
committed by GitHub
parent e5de28bff2
commit 0d61c65de1
6 changed files with 185 additions and 7 deletions

View File

@@ -47,7 +47,8 @@ from fastdeploy.output.token_processor import (TokenProcessor,
WarmUpTokenProcessor) WarmUpTokenProcessor)
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
from fastdeploy.utils import EngineError, console_logger, llm_logger from fastdeploy.utils import EngineError, console_logger, llm_logger
from fastdeploy.metrics.trace_util import extract_from_metadata, start_span, start_span_request
from opentelemetry import trace
class LLMEngine(object): class LLMEngine(object):
""" """
@@ -56,6 +57,7 @@ class LLMEngine(object):
Attributes: Attributes:
cfg (Config): Configuration object containing all the parameters. cfg (Config): Configuration object containing all the parameters.
cached_generated_tokens (queue.Queue): Queue to store generated tokens. cached_generated_tokens (queue.Queue): Queue to store generated tokens.
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
scheduler (LocalScheduler or GlobalScheduler): Scheduling tasks. scheduler (LocalScheduler or GlobalScheduler): Scheduling tasks.
input_processor (InputPreprocessor): Preprocessor for input data. input_processor (InputPreprocessor): Preprocessor for input data.
resource_manager (ResourceManager): Manager for resource allocation. resource_manager (ResourceManager): Manager for resource allocation.
@@ -375,7 +377,10 @@ class LLMEngine(object):
request, insert_task = None, [] request, insert_task = None, []
results: List[Tuple[str, Optional[str]]] = list() results: List[Tuple[str, Optional[str]]] = list()
if data: if data:
request = Request.from_dict(data) request = Request.from_dict(data)
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
llm_logger.debug(f"Receive request: {request}") llm_logger.debug(f"Receive request: {request}")
err_msg = None err_msg = None
@@ -706,6 +711,8 @@ class LLMEngine(object):
""" """
Insert tasks to engine. Insert tasks to engine.
""" """
for task in tasks:
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
# TODO 返回至 scheduler # TODO 返回至 scheduler
if allocated: if allocated:
current_tasks = [] current_tasks = []

View File

@@ -55,7 +55,8 @@ class Request:
guided_grammar: Optional[Any] = None, guided_grammar: Optional[Any] = None,
structural_tag: Optional[Any] = None, structural_tag: Optional[Any] = None,
guided_json_object: Optional[bool] = None, guided_json_object: Optional[bool] = None,
enable_thinking: Optional[bool] = True) -> None: enable_thinking: Optional[bool] = True,
trace_carrier: dict = dict()) -> None:
self.request_id = request_id self.request_id = request_id
self.prompt = prompt self.prompt = prompt
self.prompt_token_ids = prompt_token_ids self.prompt_token_ids = prompt_token_ids
@@ -91,6 +92,7 @@ class Request:
self.multimodal_data = multimodal_data self.multimodal_data = multimodal_data
self.enable_thinking = enable_thinking self.enable_thinking = enable_thinking
self.trace_carrier = trace_carrier
@classmethod @classmethod
def from_dict(cls, d: dict): def from_dict(cls, d: dict):
@@ -120,7 +122,8 @@ class Request:
guided_grammar=d.get("guided_grammar", None), guided_grammar=d.get("guided_grammar", None),
structural_tag=d.get("structural_tag", None), structural_tag=d.get("structural_tag", None),
guided_json_object=d.get("guided_json_object", None), guided_json_object=d.get("guided_json_object", None),
enable_thinking=d.get("enable_thinking", True)) enable_thinking=d.get("enable_thinking", True),
trace_carrier=d.get("trace_carrier", {}))
def to_dict(self) -> dict: def to_dict(self) -> dict:
"""convert Request into a serializable dict """ """convert Request into a serializable dict """
@@ -142,7 +145,8 @@ class Request:
"raw_request": self.raw_request, "raw_request": self.raw_request,
"disaggregate_info": self.disaggregate_info, "disaggregate_info": self.disaggregate_info,
"draft_token_ids": self.draft_token_ids, "draft_token_ids": self.draft_token_ids,
"enable_thinking": self.enable_thinking "enable_thinking": self.enable_thinking,
"trace_carrier": self.trace_carrier
} }
add_params = [ add_params = [
"guided_json", "guided_regex", "guided_choice", "guided_grammar", "guided_json", "guided_regex", "guided_choice", "guided_grammar",

View File

@@ -24,6 +24,7 @@ import zmq
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse from fastapi.responses import JSONResponse, Response, StreamingResponse
from prometheus_client import CONTENT_TYPE_LATEST from prometheus_client import CONTENT_TYPE_LATEST
from fastdeploy.metrics.trace_util import inject_to_metadata
from fastdeploy.engine.args_utils import EngineArgs from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.engine import LLMEngine from fastdeploy.engine.engine import LLMEngine
@@ -210,6 +211,7 @@ async def create_chat_completion(request: ChatCompletionRequest):
return JSONResponse( return JSONResponse(
content={"error": "Worker Service Not Healthy"}, content={"error": "Worker Service Not Healthy"},
status_code=304) status_code=304)
inject_to_metadata(request)
generator = await app.state.chat_handler.create_chat_completion(request) generator = await app.state.chat_handler.create_chat_completion(request)
if isinstance(generator, ErrorResponse): if isinstance(generator, ErrorResponse):

View File

@@ -0,0 +1,144 @@
from opentelemetry.propagate import inject, extract
from opentelemetry import trace
import json
import os
# create global OpenTelemetry tracer
tracer = trace.get_tracer(__name__)
# OpenTelemetry Trace context store in metadata
TRACE_CARRIER = "trace_carrier"
def inject_to_metadata(request, metadata_attr='metadata'):
"""
Inject OpenTelemetry trace context into the metadata field of the request.
Parameters:
request: can be a dict or object, with metadata attributes or fields.
metadata_attr: the field name of metadata, default is 'metadata'.
Operation:
- If metadata does not exist, create a new one and mount it on the request.
- Inject the current trace context as a JSON string and store it in metadata.
- Use the key TRACE_CARRIER to store the injected content.
Note:
- This function is a non-blocking operation, and errors are silently ignored.
- If there is no metadata attribute in the request, an empty dict will be created for it as its attribute
"""
try:
if request is None:
return
if is_opentelemetry_instrumented() == False:
return
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
if metadata is None:
metadata = {}
if isinstance(request, dict):
request[metadata_attr] = metadata
else:
setattr(request, metadata_attr, metadata)
trace_carrier = {}
inject(trace_carrier)
trace_carrier_json_string = json.dumps(trace_carrier)
metadata[TRACE_CARRIER] = trace_carrier_json_string
except:
pass
def extract_from_metadata(request, metadata_attr='metadata'):
"""
Extract trace context from metadata of request object (dict or class instance).
Parameters:
request: can be a dictionary or any object, containing metadata attributes or fields.
metadata_attr: metadata field name, default is 'metadata'.
Returns:
- Extraction success: returns OpenTelemetry context object (Context)
- Extraction failure or exception: returns None
"""
try:
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
if metadata is None:
return None
trace_carrier_json_string = metadata.get(TRACE_CARRIER)
if trace_carrier_json_string is None:
return None
trace_carrier = json.loads(trace_carrier_json_string)
ctx = extract(trace_carrier)
return ctx
except:
return None
def extract_from_request(request):
"""
Extract trace context from trace_carrier of request object (dict or class instance).
Parameters:
request: can be a dictionary or any object, containing metadata attributes or fields.
metadata_attr: metadata field name, default is 'metadata'.
Returns:
- Extraction success: returns OpenTelemetry context object (Context)
- Extraction failure or exception: returns None
"""
try:
trace_carrier_info = getattr(request, TRACE_CARRIER, None)
if trace_carrier_info is None:
return None
trace_carrier = json.loads(trace_carrier_info)
ctx = extract(trace_carrier)
return ctx
except:
return None
def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_metadata(request)
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
pass
except:
pass
def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_request(request)
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
pass
except:
pass
def is_opentelemetry_instrumented() -> bool:
"""
check OpenTelemetry is start or not
"""
try:
return (
os.getenv("OTEL_PYTHONE_DISABLED_INSTRUMENTATIONS") is not None
or os.getenv("OTEL_SERVICE_NAME") is not None
or os.getenv("OTEL_TRACES_EXPORTER") is not None
)
except Exception:
return False

View File

@@ -29,3 +29,9 @@ triton==3.3
use-triton-in-paddle use-triton-in-paddle
crcmod crcmod
fastsafetensors==0.1.14 fastsafetensors==0.1.14
opentelemetry-api>=1.24.0
opentelemetry-sdk>=1.24.0
opentelemetry-instrumentation-redis
opentelemetry-instrumentation-mysql
opentelemetry-distro 
opentelemetry-exporter-otlp

View File

@@ -19,6 +19,9 @@ import re
import sys import sys
import paddle import paddle
import subprocess import subprocess
from setuptools import setup
from setuptools.command.install import install
from pathlib import Path
from pathlib import Path from pathlib import Path
from setuptools import Extension, find_packages, setup from setuptools import Extension, find_packages, setup
from setuptools.command.build_ext import build_ext from setuptools.command.build_ext import build_ext
@@ -137,8 +140,17 @@ class CMakeBuild(build_ext):
cwd=build_temp, cwd=build_temp,
check=True) check=True)
subprocess.run(["cmake", "--build", ".", *build_args], subprocess.run(["cmake", "--build", ".", *build_args],
cwd=build_temp, cwd=build_temp,
check=True) check=True)
class PostInstallCommand(install):
"""在标准安装完成后执行自定义命令"""
def run(self):
# 先执行标准安装步骤
install.run(self)
# 执行自定义命令
subprocess.check_call(["opentelemetry-bootstrap", "-a", "install"])
subprocess.check_call(["pip", "install", "opentelemetry-instrumentation-fastapi"])
def load_requirements(): def load_requirements():
"""Load dependencies from requirements.txt""" """Load dependencies from requirements.txt"""
@@ -169,6 +181,8 @@ def get_name():
cmdclass_dict = {'bdist_wheel': CustomBdistWheel} cmdclass_dict = {'bdist_wheel': CustomBdistWheel}
cmdclass_dict['build_ext'] = CMakeBuild cmdclass_dict['build_ext'] = CMakeBuild
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.0-dev")
cmdclass_dict['build_optl'] = PostInstallCommand
setup( setup(
name=get_name(), name=get_name(),
@@ -211,3 +225,4 @@ setup(
python_requires=">=3.7", python_requires=">=3.7",
extras_require={"test": ["pytest>=6.0"]}, extras_require={"test": ["pytest>=6.0"]},
) )