From 0d61c65de15050a11e9b7cf3fb0331ce75fff405 Mon Sep 17 00:00:00 2001 From: sg263 <1021937542@qq.com> Date: Wed, 16 Jul 2025 15:35:44 +0800 Subject: [PATCH] [Trace] Support trace log (#2864) * add opentelemetry * add opentelemetry * add opentelemetry on dequeue * add opentelemetry on dequeue * add opentelemetry on dequeue --- fastdeploy/engine/engine.py | 11 +- fastdeploy/engine/request.py | 10 +- fastdeploy/entrypoints/openai/api_server.py | 2 + fastdeploy/metrics/trace_util.py | 144 ++++++++++++++++++++ requirements.txt | 6 + setup.py | 19 ++- 6 files changed, 185 insertions(+), 7 deletions(-) create mode 100644 fastdeploy/metrics/trace_util.py diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 47ec60243..2a4977a1b 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -47,7 +47,8 @@ from fastdeploy.output.token_processor import (TokenProcessor, WarmUpTokenProcessor) from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector from fastdeploy.utils import EngineError, console_logger, llm_logger - +from fastdeploy.metrics.trace_util import extract_from_metadata, start_span, start_span_request +from opentelemetry import trace class LLMEngine(object): """ @@ -56,6 +57,7 @@ class LLMEngine(object): Attributes: cfg (Config): Configuration object containing all the parameters. cached_generated_tokens (queue.Queue): Queue to store generated tokens. + cached_generated_tokens (queue.Queue): Queue to store generated tokens. scheduler (LocalScheduler or GlobalScheduler): Scheduling tasks. input_processor (InputPreprocessor): Preprocessor for input data. resource_manager (ResourceManager): Manager for resource allocation. @@ -375,7 +377,10 @@ class LLMEngine(object): request, insert_task = None, [] results: List[Tuple[str, Optional[str]]] = list() if data: - request = Request.from_dict(data) + request = Request.from_dict(data) + start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER) + + llm_logger.debug(f"Receive request: {request}") err_msg = None @@ -706,6 +711,8 @@ class LLMEngine(object): """ Insert tasks to engine. """ + for task in tasks: + start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER) # TODO 返回至 scheduler if allocated: current_tasks = [] diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index af3df438a..6520f9f47 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -55,7 +55,8 @@ class Request: guided_grammar: Optional[Any] = None, structural_tag: Optional[Any] = None, guided_json_object: Optional[bool] = None, - enable_thinking: Optional[bool] = True) -> None: + enable_thinking: Optional[bool] = True, + trace_carrier: dict = dict()) -> None: self.request_id = request_id self.prompt = prompt self.prompt_token_ids = prompt_token_ids @@ -91,6 +92,7 @@ class Request: self.multimodal_data = multimodal_data self.enable_thinking = enable_thinking + self.trace_carrier = trace_carrier @classmethod def from_dict(cls, d: dict): @@ -120,7 +122,8 @@ class Request: guided_grammar=d.get("guided_grammar", None), structural_tag=d.get("structural_tag", None), guided_json_object=d.get("guided_json_object", None), - enable_thinking=d.get("enable_thinking", True)) + enable_thinking=d.get("enable_thinking", True), + trace_carrier=d.get("trace_carrier", {})) def to_dict(self) -> dict: """convert Request into a serializable dict """ @@ -142,7 +145,8 @@ class Request: "raw_request": self.raw_request, "disaggregate_info": self.disaggregate_info, "draft_token_ids": self.draft_token_ids, - "enable_thinking": self.enable_thinking + "enable_thinking": self.enable_thinking, + "trace_carrier": self.trace_carrier } add_params = [ "guided_json", "guided_regex", "guided_choice", "guided_grammar", diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index c06e99edf..ef9a9881e 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -24,6 +24,7 @@ import zmq from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, Response, StreamingResponse from prometheus_client import CONTENT_TYPE_LATEST +from fastdeploy.metrics.trace_util import inject_to_metadata from fastdeploy.engine.args_utils import EngineArgs from fastdeploy.engine.engine import LLMEngine @@ -210,6 +211,7 @@ async def create_chat_completion(request: ChatCompletionRequest): return JSONResponse( content={"error": "Worker Service Not Healthy"}, status_code=304) + inject_to_metadata(request) generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): diff --git a/fastdeploy/metrics/trace_util.py b/fastdeploy/metrics/trace_util.py new file mode 100644 index 000000000..2ed6fd812 --- /dev/null +++ b/fastdeploy/metrics/trace_util.py @@ -0,0 +1,144 @@ +from opentelemetry.propagate import inject, extract +from opentelemetry import trace + +import json +import os + +# create global OpenTelemetry tracer +tracer = trace.get_tracer(__name__) + +# OpenTelemetry Trace context store in metadata +TRACE_CARRIER = "trace_carrier" + +def inject_to_metadata(request, metadata_attr='metadata'): + """ + Inject OpenTelemetry trace context into the metadata field of the request. + + Parameters: + request: can be a dict or object, with metadata attributes or fields. + metadata_attr: the field name of metadata, default is 'metadata'. + + Operation: + - If metadata does not exist, create a new one and mount it on the request. + - Inject the current trace context as a JSON string and store it in metadata. + - Use the key TRACE_CARRIER to store the injected content. + + Note: + - This function is a non-blocking operation, and errors are silently ignored. + - If there is no metadata attribute in the request, an empty dict will be created for it as its attribute + """ + try: + if request is None: + return + if is_opentelemetry_instrumented() == False: + return + + metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None) + if metadata is None: + metadata = {} + if isinstance(request, dict): + request[metadata_attr] = metadata + else: + setattr(request, metadata_attr, metadata) + + trace_carrier = {} + inject(trace_carrier) + trace_carrier_json_string = json.dumps(trace_carrier) + metadata[TRACE_CARRIER] = trace_carrier_json_string + except: + pass + +def extract_from_metadata(request, metadata_attr='metadata'): + """ + Extract trace context from metadata of request object (dict or class instance). + + Parameters: + request: can be a dictionary or any object, containing metadata attributes or fields. + metadata_attr: metadata field name, default is 'metadata'. + + Returns: + - Extraction success: returns OpenTelemetry context object (Context) + - Extraction failure or exception: returns None + """ + try: + metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None) + if metadata is None: + return None + + trace_carrier_json_string = metadata.get(TRACE_CARRIER) + if trace_carrier_json_string is None: + return None + + trace_carrier = json.loads(trace_carrier_json_string) + ctx = extract(trace_carrier) + return ctx + except: + return None + + +def extract_from_request(request): + """ + Extract trace context from trace_carrier of request object (dict or class instance). + + Parameters: + request: can be a dictionary or any object, containing metadata attributes or fields. + metadata_attr: metadata field name, default is 'metadata'. + + Returns: + - Extraction success: returns OpenTelemetry context object (Context) + - Extraction failure or exception: returns None + """ + try: + trace_carrier_info = getattr(request, TRACE_CARRIER, None) + + if trace_carrier_info is None: + return None + + trace_carrier = json.loads(trace_carrier_info) + ctx = extract(trace_carrier) + return ctx + except: + return None + +def start_span(span_name, request, kind=trace.SpanKind.CLIENT): + """ + just start a new span in request trace context + """ + try: + if is_opentelemetry_instrumented() == False: + return + # extract Trace context from request.metadata.trace_carrier + ctx = extract_from_metadata(request) + with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span: + pass + except: + pass + +def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT): + """ + just start a new span in request trace context + """ + try: + if is_opentelemetry_instrumented() == False: + return + # extract Trace context from request.metadata.trace_carrier + ctx = extract_from_request(request) + with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span: + pass + except: + pass + +def is_opentelemetry_instrumented() -> bool: + """ + check OpenTelemetry is start or not + """ + try: + return ( + os.getenv("OTEL_PYTHONE_DISABLED_INSTRUMENTATIONS") is not None + or os.getenv("OTEL_SERVICE_NAME") is not None + or os.getenv("OTEL_TRACES_EXPORTER") is not None + ) + except Exception: + return False + + diff --git a/requirements.txt b/requirements.txt index 72f79add4..1432d9c1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,9 @@ triton==3.3 use-triton-in-paddle crcmod fastsafetensors==0.1.14 +opentelemetry-api>=1.24.0 +opentelemetry-sdk>=1.24.0 +opentelemetry-instrumentation-redis +opentelemetry-instrumentation-mysql +opentelemetry-distro  +opentelemetry-exporter-otlp diff --git a/setup.py b/setup.py index d19fa6689..f7ec5c14b 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,9 @@ import re import sys import paddle import subprocess +from setuptools import setup +from setuptools.command.install import install +from pathlib import Path from pathlib import Path from setuptools import Extension, find_packages, setup from setuptools.command.build_ext import build_ext @@ -137,8 +140,17 @@ class CMakeBuild(build_ext): cwd=build_temp, check=True) subprocess.run(["cmake", "--build", ".", *build_args], - cwd=build_temp, - check=True) + cwd=build_temp, + check=True) + +class PostInstallCommand(install): + """在标准安装完成后执行自定义命令""" + def run(self): + # 先执行标准安装步骤 + install.run(self) + # 执行自定义命令 + subprocess.check_call(["opentelemetry-bootstrap", "-a", "install"]) + subprocess.check_call(["pip", "install", "opentelemetry-instrumentation-fastapi"]) def load_requirements(): """Load dependencies from requirements.txt""" @@ -169,6 +181,8 @@ def get_name(): cmdclass_dict = {'bdist_wheel': CustomBdistWheel} cmdclass_dict['build_ext'] = CMakeBuild +FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.0-dev") +cmdclass_dict['build_optl'] = PostInstallCommand setup( name=get_name(), @@ -211,3 +225,4 @@ setup( python_requires=">=3.7", extras_require={"test": ["pytest>=6.0"]}, ) +