mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-09-28 13:22:23 +08:00

Some checks failed
Deploy GitHub Pages / deploy (push) Has been cancelled
* add opentelemetry * add opentelemetry * add opentelemetry on dequeue * add opentelemetry on dequeue * add opentelemetry on dequeue * fix annotation * fix annotation when add opentelemetry * fix opentelemetry-instrumentation-fastapi * fix pentelemetry-bootstrap * fix opentelemetry can not work in uvicorn * move conf to env * fd start add trace * fix pre-commit * fix pre-commit * change FD_JOB_ID --------- Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> Co-authored-by: shige <shige@baidu.com>
215 lines
7.1 KiB
Python
215 lines
7.1 KiB
Python
import json
|
|
import os
|
|
|
|
from fastapi import FastAPI
|
|
from opentelemetry import trace
|
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
|
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
|
from opentelemetry.propagate import extract, inject
|
|
from opentelemetry.sdk.resources import Resource
|
|
from opentelemetry.sdk.trace import TracerProvider
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
|
|
|
|
from fastdeploy import envs
|
|
from fastdeploy.utils import llm_logger
|
|
|
|
# OpenTelemetry Trace context store in metadata
|
|
TRACE_CARRIER = "trace_carrier"
|
|
|
|
traces_enable = False
|
|
tracer = trace.get_tracer(__name__)
|
|
|
|
|
|
def set_up():
|
|
try:
|
|
# when TRACES_ENABLED=true start trace
|
|
global traces_enable
|
|
traces_enable = envs.TRACES_ENABLE.lower() == "true"
|
|
if not traces_enable:
|
|
llm_logger.warning("Opentelemetry is DISABLED.")
|
|
return
|
|
|
|
llm_logger.info("Opentelemetry is ENABLED, configuring...")
|
|
# --- read env ---
|
|
service_name = envs.FD_SERVICE_NAME
|
|
host_name = envs.FD_HOST_NAME
|
|
# --- set attributes (Service Name, Host Name, etc.) ---
|
|
resource_attributes = {"service.name": service_name}
|
|
if host_name:
|
|
resource_attributes["host.name"] = host_name
|
|
|
|
resource = Resource(attributes=resource_attributes)
|
|
|
|
# --- set Exporter ---
|
|
exporter_type = envs.TRACES_EXPORTER.lower()
|
|
if exporter_type == "otlp":
|
|
endpoint = envs.EXPORTER_OTLP_ENDPOINT # should be set
|
|
headers = envs.EXPORTER_OTLP_HEADERS # e.g., "Authentication=***,k2=v2"
|
|
|
|
otlp_exporter = OTLPSpanExporter(
|
|
endpoint=endpoint,
|
|
headers=(dict(item.split("=") for item in headers.split(",")) if headers else None),
|
|
)
|
|
processor = BatchSpanProcessor(otlp_exporter)
|
|
llm_logger.info(f"Using OTLP Exporter, sending to {endpoint} with headers {headers}")
|
|
else: # default console
|
|
processor = BatchSpanProcessor(ConsoleSpanExporter())
|
|
llm_logger.info("Using Console Exporter.")
|
|
|
|
# --- set Tracer Provider ---
|
|
provider = TracerProvider(resource=resource)
|
|
provider.add_span_processor(processor)
|
|
trace.set_tracer_provider(provider)
|
|
global tracer
|
|
tracer = trace.get_tracer(__name__)
|
|
except:
|
|
llm_logger.error("set_up failed")
|
|
pass
|
|
|
|
|
|
def instrument(app: FastAPI):
|
|
try:
|
|
set_up()
|
|
if traces_enable:
|
|
llm_logger.info("Applying instrumentors...")
|
|
FastAPIInstrumentor.instrument_app(app)
|
|
except:
|
|
llm_logger.info("instrument failed")
|
|
pass
|
|
|
|
|
|
def inject_to_metadata(request, metadata_attr="metadata"):
|
|
"""
|
|
Inject OpenTelemetry trace context into the metadata field of the request.
|
|
|
|
Parameters:
|
|
request: can be a dict or object, with metadata attributes or fields.
|
|
metadata_attr: the field name of metadata, default is 'metadata'.
|
|
|
|
Operation:
|
|
- If metadata does not exist, create a new one and mount it on the request.
|
|
- Inject the current trace context as a JSON string and store it in metadata.
|
|
- Use the key TRACE_CARRIER to store the injected content.
|
|
|
|
Note:
|
|
- This function is a non-blocking operation, and errors are silently ignored.
|
|
- If there is no metadata attribute in the request, an empty dict will be created for it as its attribute
|
|
"""
|
|
try:
|
|
if request is None or not traces_enable:
|
|
return
|
|
|
|
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
|
|
if metadata is None:
|
|
metadata = {}
|
|
if isinstance(request, dict):
|
|
request[metadata_attr] = metadata
|
|
else:
|
|
setattr(request, metadata_attr, metadata)
|
|
|
|
trace_carrier = {}
|
|
inject(trace_carrier)
|
|
trace_carrier_json_string = json.dumps(trace_carrier)
|
|
metadata[TRACE_CARRIER] = trace_carrier_json_string
|
|
except:
|
|
pass
|
|
|
|
|
|
def extract_from_metadata(request, metadata_attr="metadata"):
|
|
"""
|
|
Extract trace context from metadata of request object (dict or class instance).
|
|
|
|
Parameters:
|
|
request: can be a dictionary or any object, containing metadata attributes or fields.
|
|
metadata_attr: metadata field name, default is 'metadata'.
|
|
|
|
Returns:
|
|
- Extraction success: returns OpenTelemetry context object (Context)
|
|
- Extraction failure or exception: returns None
|
|
"""
|
|
try:
|
|
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
|
|
if metadata is None:
|
|
return None
|
|
|
|
trace_carrier_json_string = metadata.get(TRACE_CARRIER)
|
|
if trace_carrier_json_string is None:
|
|
return None
|
|
|
|
trace_carrier = json.loads(trace_carrier_json_string)
|
|
ctx = extract(trace_carrier)
|
|
return ctx
|
|
except:
|
|
return None
|
|
|
|
|
|
def extract_from_request(request):
|
|
"""
|
|
Extract trace context from trace_carrier of request object (dict or class instance).
|
|
|
|
Parameters:
|
|
request: can be a dictionary or any object, containing metadata attributes or fields.
|
|
metadata_attr: metadata field name, default is 'metadata'.
|
|
|
|
Returns:
|
|
- Extraction success: returns OpenTelemetry context object (Context)
|
|
- Extraction failure or exception: returns None
|
|
"""
|
|
try:
|
|
trace_carrier_info = getattr(request, TRACE_CARRIER, None)
|
|
|
|
if trace_carrier_info is None:
|
|
return None
|
|
|
|
trace_carrier = json.loads(trace_carrier_info)
|
|
ctx = extract(trace_carrier)
|
|
return ctx
|
|
except:
|
|
return None
|
|
|
|
|
|
def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
|
|
"""
|
|
just start a new span in request trace context
|
|
"""
|
|
try:
|
|
if not traces_enable:
|
|
return
|
|
# extract Trace context from request.metadata.trace_carrier
|
|
ctx = extract_from_metadata(request)
|
|
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
|
|
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
|
|
def fd_start_span(span_name, kind=trace.SpanKind.CLIENT):
|
|
"""
|
|
when fd start, start a new span show start success
|
|
"""
|
|
try:
|
|
if not traces_enable:
|
|
return
|
|
with tracer.start_as_current_span(span_name, kind=kind) as span:
|
|
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
|
|
def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
|
|
"""
|
|
just start a new span in request trace context
|
|
"""
|
|
try:
|
|
if not traces_enable:
|
|
return
|
|
# extract Trace context from request.metadata.trace_carrier
|
|
ctx = extract_from_request(request)
|
|
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
|
|
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
|
pass
|
|
except:
|
|
pass
|