From 33d4a5a6ddb1445a55696b8393cc3c2cd4e1dee2 Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Sun, 26 Oct 2025 22:18:48 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E4=BF=AE=E5=A4=8Dai=20chat=E6=97=B6?= =?UTF-8?q?=EF=BC=8C504=E7=8A=B6=E6=80=81=E7=A0=81=E8=BF=94=E5=9B=9E400?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98=202.=20=E4=BF=AE=E5=A4=8D=E8=87=AA?= =?UTF-8?q?=E5=AE=9A=E4=B9=89=E5=8F=98=E9=87=8F=E9=87=8D=E5=90=AF=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD=E5=A4=B1=E8=B4=A5=E5=AF=BC=E8=87=B4=E6=89=80=E6=9C=89?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=97=A0=E6=B3=95=E5=8A=A0=E8=BD=BD=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/plugins/ai-formatter/executor.go | 13 ++++--- go.mod | 2 +- node/http-context/context.go | 44 ++++++++++++++---------- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/drivers/plugins/ai-formatter/executor.go b/drivers/plugins/ai-formatter/executor.go index 4f7b474f..9388d2fa 100644 --- a/drivers/plugins/ai-formatter/executor.go +++ b/drivers/plugins/ai-formatter/executor.go @@ -161,15 +161,20 @@ func (e *executor) DoHttpFilter(ctx http_context.IHttpContext, next eocontext.IC if len(balances) == 0 { body := ctx.Response().GetBody() if len(body) == 0 { - ctx.Response().SetBody([]byte(err.Error())) - ctx.Response().SetStatus(400, "Bad Request") + if ctx.Response().StatusCode() != 504 { + ctx.Response().SetBody([]byte(err.Error())) + ctx.Response().SetStatus(400, "Bad Request") + } + } return err } err = e.doBalance(ctx, cloneProxy, next) // Fallback to balance logic if err != nil { - ctx.Response().SetBody([]byte(err.Error())) - ctx.Response().SetStatus(400, "Bad Request") + if ctx.Response().StatusCode() != 504 { + ctx.Response().SetBody([]byte(err.Error())) + ctx.Response().SetStatus(400, "Bad Request") + } return err } } diff --git a/go.mod b/go.mod index c7011df4..f500d582 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/clbanning/mxj v1.8.4 github.com/coocood/freecache v1.2.2 github.com/dubbogo/gost v1.13.1 - github.com/eolinker/eosc v0.21.1 + github.com/eolinker/eosc v0.21.2 github.com/fasthttp/websocket v1.5.0 github.com/fullstorydev/grpcurl v1.8.7 github.com/go-redis/redis/v8 v8.11.5 diff --git a/node/http-context/context.go b/node/http-context/context.go index da24e461..1b270328 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -183,6 +183,7 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti host := node.Addr() request := ctx.proxyRequest.Request() + request.CloseBodyStream() rewriteHost := string(request.Host()) upstreamHost := ctx.GetUpstreamHostHandler() if upstreamHost != nil { @@ -205,6 +206,7 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti beginTime := time.Now() response := fasthttp.AcquireResponse() + //var client *fasthttp.HostClient ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, rewriteHost, node, request, response, timeout) agent := newRequestAgent(&ctx.proxyRequest, host, scheme, response.Header, beginTime, time.Now()) @@ -230,33 +232,18 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti // 流式传输,非200状态码不考虑流式传输 ctx.response.Response.SetStatusCode(response.StatusCode()) ctx.SetLabel("stream_running", "true") - ctx.response.Response.SetBodyStreamWriter(func(w *bufio.Writer) { + reader := response.BodyStream() defer func() { + response.SetConnectionClose() ctx.SetLabel("stream_running", "false") ctx.FastFinish() fasthttp.ReleaseResponse(response) }() - reader := response.BodyStream() + buffer := make([]byte, 4096) // 4KB 缓冲区 for { n, err := reader.Read(buffer) - if n > 0 { - chunk := buffer[:n] - chunk, err = ctx.proxyRequest.StreamBodyHandles(ctx, chunk) - if err != nil { - log.Errorf("exec stream func error: %v", err) - } - - n, err = w.Write(chunk) - if err != nil { - log.Errorf("stream write error: %v", err) - break - } - ctx.Response().SetBody(chunk) - - w.Flush() // 实时发送数据 - } if err != nil { if err == io.EOF { break @@ -264,6 +251,27 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti log.Errorf("stream read error: %v", err) break } + chunk := buffer[:n] + chunk, err = ctx.proxyRequest.StreamBodyHandles(ctx, chunk) + if err != nil { + log.Errorf("exec stream func error: %v", err) + } + + n, err = w.Write(chunk) + if err != nil { + log.Errorf("stream write error: %v", err) + response.SetConnectionClose() + break + } + ctx.Response().SetBody(chunk) + + err = w.Flush() // 实时发送数据 + if err != nil { + // 停止读取上游数据 + log.Errorf("stream flush error: %v", err) + response.SetConnectionClose() + break + } } ctx.proxyRequest.ProxyBodyFinish(ctx) })