[fix] Fixed the issue of excessive/redundant spans being returned for streaming requests. (#4375)

* fix stream span

* fix stream span
This commit is contained in:
qwes5s5
2025-10-15 11:47:47 +08:00
committed by GitHub
parent 28d1b6cd97
commit abb62624b8
4 changed files with 422 additions and 11 deletions

View File

@@ -29,6 +29,7 @@ import zmq
from fastapi import FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, Response, StreamingResponse
from opentelemetry import trace
from prometheus_client import CONTENT_TYPE_LATEST
from fastdeploy.engine.args_utils import EngineArgs
@@ -58,7 +59,12 @@ from fastdeploy.metrics.metrics import (
get_filtered_metrics,
main_process_metrics,
)
from fastdeploy.metrics.trace_util import fd_start_span, inject_to_metadata, instrument
from fastdeploy.metrics.trace_util import (
fd_start_span,
inject_to_metadata,
instrument,
lable_span,
)
from fastdeploy.utils import (
ExceptionHandler,
FlexibleArgumentParser,
@@ -291,12 +297,39 @@ def wrap_streaming_generator(original_generator: AsyncGenerator):
"""
async def wrapped_generator():
try:
async for chunk in original_generator:
yield chunk
finally:
api_server_logger.debug(f"release: {connection_semaphore.status()}")
connection_semaphore.release()
span = trace.get_current_span()
if span is not None and span.is_recording():
last_time = None
count = 0
try:
async for chunk in original_generator:
last_time = time.time()
# 首包捕获
if count == 0 and span is not None and span.is_recording():
last_time = time.time()
span.add_event("first_chunk", {"time": last_time})
count += 1
yield chunk
except Exception as e:
# 错误捕获
if span is not None and span.is_recording():
span.add_event("stream_error", {"time": time.time(), "error": str(e), "total_chunk": count})
span.record_exception(e)
span.set_status({"code": "ERROR", "description": str(e)})
raise
finally:
# 尾包捕获
if span is not None and span.is_recording() and count > 0:
span.add_event("last_chunk", {"time": last_time, "total_chunk": count})
api_server_logger.debug(f"release: {connection_semaphore.status()}")
connection_semaphore.release()
else:
try:
async for chunk in original_generator:
yield chunk
finally:
api_server_logger.debug(f"release: {connection_semaphore.status()}")
connection_semaphore.release()
return wrapped_generator
@@ -314,6 +347,7 @@ async def create_chat_completion(request: ChatCompletionRequest):
try:
async with connection_manager():
inject_to_metadata(request)
lable_span(request)
generator = await app.state.chat_handler.create_chat_completion(request)
if isinstance(generator, ErrorResponse):
api_server_logger.debug(f"release: {connection_semaphore.status()}")
@@ -344,6 +378,7 @@ async def create_completion(request: CompletionRequest):
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
try:
async with connection_manager():
lable_span(request)
generator = await app.state.completion_handler.create_completion(request)
if isinstance(generator, ErrorResponse):
connection_semaphore.release()