diff --git a/drivers/plugins/ai-formatter/executor.go b/drivers/plugins/ai-formatter/executor.go index 4f7b474f..9388d2fa 100644 --- a/drivers/plugins/ai-formatter/executor.go +++ b/drivers/plugins/ai-formatter/executor.go @@ -161,15 +161,20 @@ func (e *executor) DoHttpFilter(ctx http_context.IHttpContext, next eocontext.IC if len(balances) == 0 { body := ctx.Response().GetBody() if len(body) == 0 { - ctx.Response().SetBody([]byte(err.Error())) - ctx.Response().SetStatus(400, "Bad Request") + if ctx.Response().StatusCode() != 504 { + ctx.Response().SetBody([]byte(err.Error())) + ctx.Response().SetStatus(400, "Bad Request") + } + } return err } err = e.doBalance(ctx, cloneProxy, next) // Fallback to balance logic if err != nil { - ctx.Response().SetBody([]byte(err.Error())) - ctx.Response().SetStatus(400, "Bad Request") + if ctx.Response().StatusCode() != 504 { + ctx.Response().SetBody([]byte(err.Error())) + ctx.Response().SetStatus(400, "Bad Request") + } return err } } diff --git a/go.mod b/go.mod index c7011df4..f500d582 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/clbanning/mxj v1.8.4 github.com/coocood/freecache v1.2.2 github.com/dubbogo/gost v1.13.1 - github.com/eolinker/eosc v0.21.1 + github.com/eolinker/eosc v0.21.2 github.com/fasthttp/websocket v1.5.0 github.com/fullstorydev/grpcurl v1.8.7 github.com/go-redis/redis/v8 v8.11.5 diff --git a/node/http-context/context.go b/node/http-context/context.go index da24e461..1b270328 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -183,6 +183,7 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti host := node.Addr() request := ctx.proxyRequest.Request() + request.CloseBodyStream() rewriteHost := string(request.Host()) upstreamHost := ctx.GetUpstreamHostHandler() if upstreamHost != nil { @@ -205,6 +206,7 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti beginTime := time.Now() response := fasthttp.AcquireResponse() + //var client *fasthttp.HostClient ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, rewriteHost, node, request, response, timeout) agent := newRequestAgent(&ctx.proxyRequest, host, scheme, response.Header, beginTime, time.Now()) @@ -230,33 +232,18 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti // 流式传输,非200状态码不考虑流式传输 ctx.response.Response.SetStatusCode(response.StatusCode()) ctx.SetLabel("stream_running", "true") - ctx.response.Response.SetBodyStreamWriter(func(w *bufio.Writer) { + reader := response.BodyStream() defer func() { + response.SetConnectionClose() ctx.SetLabel("stream_running", "false") ctx.FastFinish() fasthttp.ReleaseResponse(response) }() - reader := response.BodyStream() + buffer := make([]byte, 4096) // 4KB 缓冲区 for { n, err := reader.Read(buffer) - if n > 0 { - chunk := buffer[:n] - chunk, err = ctx.proxyRequest.StreamBodyHandles(ctx, chunk) - if err != nil { - log.Errorf("exec stream func error: %v", err) - } - - n, err = w.Write(chunk) - if err != nil { - log.Errorf("stream write error: %v", err) - break - } - ctx.Response().SetBody(chunk) - - w.Flush() // 实时发送数据 - } if err != nil { if err == io.EOF { break @@ -264,6 +251,27 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti log.Errorf("stream read error: %v", err) break } + chunk := buffer[:n] + chunk, err = ctx.proxyRequest.StreamBodyHandles(ctx, chunk) + if err != nil { + log.Errorf("exec stream func error: %v", err) + } + + n, err = w.Write(chunk) + if err != nil { + log.Errorf("stream write error: %v", err) + response.SetConnectionClose() + break + } + ctx.Response().SetBody(chunk) + + err = w.Flush() // 实时发送数据 + if err != nil { + // 停止读取上游数据 + log.Errorf("stream flush error: %v", err) + response.SetConnectionClose() + break + } } ctx.proxyRequest.ProxyBodyFinish(ctx) })