diff --git a/ai-convert/label.go b/ai-convert/label.go index e345cf19..cd84440a 100644 --- a/ai-convert/label.go +++ b/ai-convert/label.go @@ -8,6 +8,7 @@ var ( AIModelTotalTokenLabel = "ai_model_total_token" AIModelModeLabel = "ai_model_mode" AIModelLabel = "ai_model" + AIKeyLabel = "ai_key" AIProviderLabel = "ai_provider" AIProviderStatusesLabel = "ai_provider_statuses" AIModelStatusLabel = "ai_model_status" @@ -69,6 +70,14 @@ func GetAIModel(ctx eocontext.EoContext) string { return valueString(ctx, AIModelLabel) } +func SetAIKey(ctx eocontext.EoContext, key string) { + ctx.WithValue(AIKeyLabel, key) +} + +func GetAIKey(ctx eocontext.EoContext) string { + return valueString(ctx, AIKeyLabel) +} + func SetAIProvider(ctx eocontext.EoContext, provider string) { ctx.WithValue(AIProviderLabel, provider) } diff --git a/ai-convert/manager.go b/ai-convert/manager.go index bf8ffa7a..1cb2b411 100644 --- a/ai-convert/manager.go +++ b/ai-convert/manager.go @@ -86,6 +86,7 @@ func (m *KeyPoolManager) Set(id string, resource IKeyResource) { sort.Slice(keys, func(i, j int) bool { return keys[i].Priority() < keys[j].Priority() }) + m.keySorts.Set(id, keys) } diff --git a/ai-convert/mapping.go b/ai-convert/mapping.go index 6af8f99d..4802227f 100644 --- a/ai-convert/mapping.go +++ b/ai-convert/mapping.go @@ -36,12 +36,12 @@ func TransformData(inputJSON string, mappingRule MappingRule) (map[string]interf if err != nil { return nil, fmt.Errorf("类型转换失败 %s -> %s: %v", key, v.Value, err) } - //if value == "response_format" { - // resultMap[v.Value] = map[string]interface{}{ - // "type": convertedValue, - // } - // continue - //} + if value == "response_format" { + resultMap[v.Value] = map[string]interface{}{ + "type": convertedValue, + } + continue + } resultMap[v.Value] = convertedValue } else { resultMap[key] = value diff --git a/ai-convert/message.go b/ai-convert/message.go index ca443fca..d0c4aa16 100644 --- a/ai-convert/message.go +++ b/ai-convert/message.go @@ -6,7 +6,24 @@ import ( ) // Request 定义客户端统一输入请求格式 -type Request openai.ChatCompletionRequest +type Request struct { + Model string `json:"model"` + Messages []openai.ChatCompletionMessage `json:"messages"` + // MaxTokens The maximum number of tokens that can be generated in the chat completion. + // This value can be used to control costs for text generated via API. + // This value is now deprecated in favor of max_completion_tokens, and is not compatible with o1 series models. + // refs: https://platform.openai.com/docs/api-reference/chat/create#chat-create-max_tokens + MaxTokens int `json:"max_tokens,omitempty"` + // MaxCompletionTokens An upper bound for the number of tokens that can be generated for a completion, + // including visible output tokens and reasoning tokens https://platform.openai.com/docs/guides/reasoning + MaxCompletionTokens int `json:"max_completion_tokens,omitempty"` + Temperature float32 `json:"temperature,omitempty"` + TopP float32 `json:"top_p,omitempty"` + N int `json:"n,omitempty"` + Stream bool `json:"stream,omitempty"` + Stop []string `json:"stop,omitempty"` + PresencePenalty float32 `json:"presence_penalty,omitempty"` +} // Response 定义客户端统一输出响应格式 type Response struct { diff --git a/ai-convert/openai.go b/ai-convert/openai.go index 714d85a8..f94f6be0 100644 --- a/ai-convert/openai.go +++ b/ai-convert/openai.go @@ -1,8 +1,6 @@ package ai_convert import ( - "bufio" - "bytes" "encoding/json" "fmt" "net/url" @@ -49,6 +47,9 @@ func NewOpenAIConvert(apikey string, baseUrl string, timeout time.Duration, chec if err != nil { return nil, err } + if strings.TrimSuffix(u.Path, "/") == "" { + u.Path = "/v1" + } c.path = fmt.Sprintf("%s%s", strings.TrimSuffix(u.Path, "/"), OpenAIChatCompletePath) } else { c.path = fmt.Sprintf("/v1%s", OpenAIChatCompletePath) @@ -66,7 +67,7 @@ func (o *OpenAIConvert) RequestConvert(ctx eoscContext.EoContext, extender map[s return err } var promptToken int - chatRequest := eosc.NewBase[openai.ChatCompletionRequest](extender) + chatRequest := eosc.NewBase[Request](extender) err = json.Unmarshal(body, chatRequest) if err != nil { return fmt.Errorf("unmarshal body error: %v, body: %s", err, string(body)) @@ -89,6 +90,7 @@ func (o *OpenAIConvert) RequestConvert(ctx eoscContext.EoContext, extender map[s if o.balanceHandler != nil { ctx.SetBalance(o.balanceHandler) } + httpContext.AppendBodyFinishFunc(o.bodyFinish) return nil } @@ -133,36 +135,63 @@ func (o *OpenAIConvert) ResponseConvert(ctx eoscContext.EoContext) error { return ResponseConvert(ctx, o.checkErr, o.errorCallback) } +func (o *OpenAIConvert) bodyFinish(ctx http_service.IHttpContext) { + body := ctx.Response().GetBody() + defer func() { + SetAIProviderStatuses(ctx, AIProviderStatus{ + Provider: GetAIProvider(ctx), + Model: GetAIModel(ctx), + Key: GetAIKey(ctx), + Status: GetAIStatus(ctx), + }) + }() + if o.checkErr != nil && !o.checkErr(ctx, body) { + o.errorCallback(ctx, body) + return + } + encoding := ctx.Response().Headers().Get("content-encoding") + if encoding == "gzip" { + tmp, err := encoder.ToUTF8(encoding, body) + if err != nil { + log.Errorf("convert to utf-8 error: %v, body: %s", err, string(body)) + return + } + var resp openai.ChatCompletionResponse + err = json.Unmarshal(tmp, &resp) + if err != nil { + log.Errorf("unmarshal body error: %v, body: %s", err, string(tmp)) + return + } + SetAIModelInputToken(ctx, resp.Usage.PromptTokens) + SetAIModelOutputToken(ctx, resp.Usage.CompletionTokens) + SetAIModelTotalToken(ctx, resp.Usage.TotalTokens) + } +} + func (o *OpenAIConvert) streamHandler(ctx http_service.IHttpContext, p []byte) ([]byte, error) { + encoding := ctx.Response().Headers().Get("content-encoding") + if encoding == "gzip" { + return p, nil + } // 对响应数据进行划分 inputToken := GetAIModelInputToken(ctx) outputToken := 0 totalToken := inputToken - scanner := bufio.NewScanner(bytes.NewReader(p)) - // Check the content encoding and convert to UTF-8 if necessary. - encoding := ctx.Response().Headers().Get("content-encoding") - for scanner.Scan() { - line := scanner.Text() - if encoding != "utf-8" && encoding != "" { - tmp, err := encoder.ToUTF8(encoding, []byte(line)) - if err != nil { - log.Errorf("convert to utf-8 error: %v, line: %s", err, line) - return p, nil - } - if ctx.Response().StatusCode() != 200 || (o.checkErr != nil && !o.checkErr(ctx, tmp)) { - if o.errorCallback != nil { - o.errorCallback(ctx, tmp) - } - return p, nil - } - line = string(tmp) + + line := string(p) + if encoding != "utf-8" && encoding != "" { + tmp, err := encoder.ToUTF8(encoding, p) + if err != nil { + log.Errorf("convert to utf-8 error: %v, line: %s", err, line) + return p, nil } + line = string(tmp) line = strings.TrimPrefix(line, "data:") if line == "" || strings.Trim(line, " ") == "[DONE]" { return p, nil } var resp openai.ChatCompletionResponse - err := json.Unmarshal([]byte(line), &resp) + err = json.Unmarshal([]byte(line), &resp) if err != nil { return p, nil } @@ -171,10 +200,6 @@ func (o *OpenAIConvert) streamHandler(ctx http_service.IHttpContext, p []byte) ( totalToken += outputToken } } - if err := scanner.Err(); err != nil { - log.Errorf("scan error: %v", err) - return p, nil - } SetAIModelInputToken(ctx, inputToken) SetAIModelOutputToken(ctx, outputToken) diff --git a/drivers/ai-provider/bedrock/bedrock.go b/drivers/ai-provider/bedrock/bedrock.go index 82f51601..8dbc58d4 100644 --- a/drivers/ai-provider/bedrock/bedrock.go +++ b/drivers/ai-provider/bedrock/bedrock.go @@ -142,6 +142,7 @@ func (c *Convert) RequestConvert(ctx eocontext.EoContext, extender map[string]in } httpContext.Proxy().Body().SetRaw("application/json", body) httpContext.Response().AppendStreamFunc(c.streamHandler) + ctx.SetLabel("response-content-type", "text/event-stream") return nil } diff --git a/drivers/ai-provider/init.go b/drivers/ai-provider/init.go index dfc019cb..b957dfd6 100644 --- a/drivers/ai-provider/init.go +++ b/drivers/ai-provider/init.go @@ -15,6 +15,7 @@ import ( _ "github.com/eolinker/apinto/drivers/ai-provider/hugging-face" _ "github.com/eolinker/apinto/drivers/ai-provider/hunyuan" _ "github.com/eolinker/apinto/drivers/ai-provider/lm-studio" + _ "github.com/eolinker/apinto/drivers/ai-provider/local-model" _ "github.com/eolinker/apinto/drivers/ai-provider/minimax" _ "github.com/eolinker/apinto/drivers/ai-provider/mistralai" _ "github.com/eolinker/apinto/drivers/ai-provider/moonshot" diff --git a/drivers/ai-provider/local-model/local.go b/drivers/ai-provider/local-model/local.go new file mode 100644 index 00000000..3fd9fa70 --- /dev/null +++ b/drivers/ai-provider/local-model/local.go @@ -0,0 +1,71 @@ +package local_model + +import ( + "encoding/json" + "fmt" + "net/url" + + ai_convert "github.com/eolinker/apinto/ai-convert" + http_service "github.com/eolinker/eosc/eocontext/http-context" +) + +func init() { + ai_convert.RegisterConverterCreateFunc("LocalModel", Create) +} + +type Config struct { + BaseUrl string `json:"base"` +} + +// checkConfig validates the provided configuration. +// It ensures the required fields are set and checks the validity of the Base URL if provided. +// +// Parameters: +// - v: An interface{} expected to be a pointer to a Config struct. +// +// Returns: +// - *Config: The validated configuration cast to *Config. +// - error: An error if the validation fails, or nil if it succeeds. +func checkConfig(conf *Config) error { + if conf.BaseUrl == "" { + return fmt.Errorf("base url is required") + } + u, err := url.Parse(conf.BaseUrl) + if err != nil { + // Return an error if the Base URL cannot be parsed. + return fmt.Errorf("base url is invalid") + } + // Ensure the parsed URL contains both a scheme and a host. + if u.Scheme == "" || u.Host == "" { + return fmt.Errorf("base url is invalid") + } + return nil +} + +func Create(cfg string) (ai_convert.IConverter, error) { + var conf Config + err := json.Unmarshal([]byte(cfg), &conf) + if err != nil { + return nil, err + } + err = checkConfig(&conf) + if err != nil { + return nil, err + } + + return ai_convert.NewOpenAIConvert("", conf.BaseUrl, 0, nil, errorCallback) +} + +func errorCallback(ctx http_service.IHttpContext, body []byte) { + + switch ctx.Response().StatusCode() { + case 400: + // Handle the bad request error. + ai_convert.SetAIStatusInvalidRequest(ctx) + case 401: + // Handle the key error. + ai_convert.SetAIStatusInvalid(ctx) + default: + ai_convert.SetAIStatusInvalidRequest(ctx) + } +} diff --git a/drivers/ai-provider/mapping.go b/drivers/ai-provider/mapping.go index 1418b8d2..ec099913 100644 --- a/drivers/ai-provider/mapping.go +++ b/drivers/ai-provider/mapping.go @@ -40,10 +40,10 @@ var ( Value: "top_k", Type: "int", }, - //{ - // Value: "response_format", - // Type: "string", - //}, + { + Value: "response_format", + Type: "string", + }, { Value: "stream", Type: "bool", diff --git a/drivers/output/loki/output.go b/drivers/output/loki/output.go index 15008229..df28e1b3 100644 --- a/drivers/output/loki/output.go +++ b/drivers/output/loki/output.go @@ -70,7 +70,7 @@ func (o *Output) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker } func (o *Output) reset(conf *Config) error { - if reflect.DeepEqual(conf, o.outputChan) { + if reflect.DeepEqual(conf, o.conf) { return nil } //创建formatter @@ -136,7 +136,7 @@ func (o *Output) Output(entry eosc.IEntry) error { }, }, } - return eosc.ErrorWorkerNotRunning + return nil } func (o *Output) genRequest(data []byte) (*http.Request, error) { diff --git a/drivers/plugins/ai-formatter/executor.go b/drivers/plugins/ai-formatter/executor.go index c40b54be..170ff2d7 100644 --- a/drivers/plugins/ai-formatter/executor.go +++ b/drivers/plugins/ai-formatter/executor.go @@ -76,7 +76,10 @@ func (e *executor) doConverter(ctx http_context.IHttpContext, next eocontext.ICh } } if ctx.Response().IsBodyStream() { - ctx.Response().SetHeader("Content-Type", "text/event-stream") + contentType := ctx.GetLabel("response-content-type") + if ctx.GetLabel("response-content-type") != "" { + ctx.Response().SetHeader("Content-Type", contentType) + } return nil } if err := resource.ResponseConvert(ctx); err != nil { @@ -104,6 +107,7 @@ func (e *executor) tryProvider(ctx http_context.IHttpContext, originProxy http_c } ctx.SetProxy(originProxy) for _, resource := range resources { + ai_convert.SetAIKey(ctx, resource.ID()) err := e.doConverter(ctx, next, resource, provider, extender) if err != nil { log.Errorf("try provider error: %v", err) @@ -138,8 +142,21 @@ func (e *executor) DoHttpFilter(ctx http_context.IHttpContext, next eocontext.IC if ai_convert.GetAIModel(ctx) == "" { ai_convert.SetAIModel(ctx, e.model) } - + defer func() { + // If the request is successful, set the AI provider and model in the response headers + ctx.Response().SetHeader("X-AI-Provider", ai_convert.GetAIProvider(ctx)) + ctx.Response().SetHeader("X-AI-Model", ai_convert.GetAIModel(ctx)) + }() if err := e.processKeyPool(ctx, provider, cloneProxy, next); err != nil { + balances := ai_convert.Balances() + if len(balances) == 0 { + body := ctx.Response().GetBody() + if len(body) == 0 { + ctx.Response().SetBody([]byte(err.Error())) + ctx.Response().SetStatus(400, "Bad Request") + } + return err + } err = e.doBalance(ctx, cloneProxy, next) // Fallback to balance logic if err != nil { ctx.Response().SetBody([]byte(err.Error())) @@ -147,9 +164,6 @@ func (e *executor) DoHttpFilter(ctx http_context.IHttpContext, next eocontext.IC return err } } - // If the request is successful, set the AI provider and model in the response headers - ctx.Response().SetHeader("X-AI-Provider", ai_convert.GetAIProvider(ctx)) - ctx.Response().SetHeader("X-AI-Model", ai_convert.GetAIModel(ctx)) return nil } @@ -176,7 +190,7 @@ func (e *executor) processKeyPool(ctx http_context.IHttpContext, provider string if !r.Health() { continue } - + ai_convert.SetAIKey(ctx, r.ID()) if err = r.RequestConvert(ctx, extender); err != nil { ai_convert.SetAIProviderStatuses(ctx, ai_convert.AIProviderStatus{ Provider: e.provider, @@ -201,7 +215,10 @@ func (e *executor) processKeyPool(ctx http_context.IHttpContext, provider string } } if ctx.Response().IsBodyStream() { - ctx.Response().SetHeader("Content-Type", "text/event-stream") + contentType := ctx.GetLabel("response-content-type") + if ctx.GetLabel("response-content-type") != "" { + ctx.Response().SetHeader("Content-Type", contentType) + } return nil } if err = r.ResponseConvert(ctx); err != nil { diff --git a/drivers/router/http-router/finish.go b/drivers/router/http-router/finish.go index 9c72f9ac..07716f15 100644 --- a/drivers/router/http-router/finish.go +++ b/drivers/router/http-router/finish.go @@ -15,6 +15,7 @@ func (f *Finisher) Finish(org eocontext.EoContext) error { if err != nil { return err } + ctx.SetLabel("current_running", "false") ctx.FastFinish() return nil diff --git a/drivers/router/http-router/http-complete/complete.go b/drivers/router/http-router/http-complete/complete.go index ca0da4ea..ebe4a891 100644 --- a/drivers/router/http-router/http-complete/complete.go +++ b/drivers/router/http-router/http-complete/complete.go @@ -2,6 +2,7 @@ package http_complete import ( "errors" + "fmt" "strconv" "strings" @@ -34,7 +35,10 @@ func (h *HttpComplete) Complete(org eocontext.EoContext) error { } //设置响应开始时间 proxyTime := time.Now() - + balance := ctx.GetBalance() + if balance == nil { + return fmt.Errorf("balance not found") + } defer func() { //设置原始响应状态码 ctx.Response().SetProxyStatus(ctx.Response().StatusCode(), "") @@ -42,8 +46,6 @@ func (h *HttpComplete) Complete(org eocontext.EoContext) error { ctx.SetLabel("handler", "proxy") }() - balance := ctx.GetBalance() - scheme := balance.Scheme() switch strings.ToLower(scheme) { diff --git a/drivers/router/http-router/manager/manager.go b/drivers/router/http-router/manager/manager.go index d36b49d1..d605a5d6 100644 --- a/drivers/router/http-router/manager/manager.go +++ b/drivers/router/http-router/manager/manager.go @@ -98,6 +98,7 @@ func (m *Manager) FastHandler(port int, ctx *fasthttp.RequestCtx) { (*globalFilters).Chain(httpContext, completeCaller) } } else { + httpContext.SetLabel("current_running", "true") log.Debug("match has:", port) r.Serve(httpContext) } diff --git a/encoder/gzip.go b/encoder/gzip.go index 525b1585..e2dda6bd 100644 --- a/encoder/gzip.go +++ b/encoder/gzip.go @@ -1,35 +1,40 @@ package encoder import ( - "bytes" - "compress/gzip" - "io" + "github.com/valyala/bytebufferpool" + "github.com/valyala/fasthttp" ) func init() { - //encoderManger.Set("gzip", &Gzip{}) + encoderManger.Set("gzip", &Gzip{}) } type Gzip struct { } func (g *Gzip) ToUTF8(data []byte) ([]byte, error) { + var bb bytebufferpool.ByteBuffer + _, err := fasthttp.WriteGunzip(&bb, data) + if err != nil { + return nil, err + } + return bb.B, nil // 创建一个gzip reader - reader, err := gzip.NewReader(bytes.NewReader(data)) - if err != nil { - return nil, err - } - defer reader.Close() - - // 读取解压后的数据 - var buf bytes.Buffer - _, err = io.Copy(&buf, reader) - if err != nil { - return nil, err - } - - // 返回解压后的数据 - // 注意:这里假设解压后的数据已经是UTF-8编码 - // 如果需要处理其他编码转UTF-8,需要额外的转换步骤 - return buf.Bytes(), nil + //reader, err := gzip.NewReader(bytes.NewReader(data)) + //if err != nil { + // return nil, err + //} + //defer reader.Close() + // + //// 读取解压后的数据 + //var buf bytes.Buffer + //_, err = io.Copy(&buf, reader) + //if err != nil { + // return nil, err + //} + // + //// 返回解压后的数据 + //// 注意:这里假设解压后的数据已经是UTF-8编码 + //// 如果需要处理其他编码转UTF-8,需要额外的转换步骤 + //return buf.Bytes(), nil } diff --git a/go.mod b/go.mod index 7499faf4..af7587a8 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/brianvoe/gofakeit/v6 v6.20.1 github.com/clbanning/mxj v1.8.4 github.com/coocood/freecache v1.2.2 + //github.com/dgrr/http2 v0.3.5 github.com/dubbogo/gost v1.13.1 github.com/eolinker/eosc v0.20.3 github.com/fasthttp/websocket v1.5.0 @@ -20,7 +21,6 @@ require ( github.com/hashicorp/consul/api v1.9.1 github.com/influxdata/influxdb-client-go/v2 v2.12.1 github.com/jhump/protoreflect v1.16.0 - github.com/joho/godotenv v1.3.0 github.com/lestrrat-go/jwx v1.2.28 github.com/nacos-group/nacos-sdk-go/v2 v2.2.3 github.com/nsqio/go-nsq v1.1.0 @@ -35,9 +35,9 @@ require ( github.com/stretchr/testify v1.9.0 github.com/traefik/yaegi v0.16.1 github.com/urfave/cli/v2 v2.23.4 - github.com/valyala/fasthttp v1.47.0 - golang.org/x/crypto v0.21.0 - golang.org/x/net v0.22.0 + github.com/valyala/fasthttp v1.59.0 + golang.org/x/crypto v0.33.0 + golang.org/x/net v0.35.0 golang.org/x/oauth2 v0.14.0 google.golang.org/api v0.149.0 google.golang.org/grpc v1.61.0 @@ -105,7 +105,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel/metric v1.20.0 // indirect - golang.org/x/sync v0.6.0 // indirect + golang.org/x/sync v0.11.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect @@ -116,7 +116,7 @@ require ( require ( dubbo.apache.org/dubbo-go/v3 v3.0.2-0.20220519062747-f6405fa79d5c - github.com/andybalholm/brotli v1.0.5 + github.com/andybalholm/brotli v1.1.1 github.com/apache/dubbo-go-hessian2 v1.11.6 github.com/armon/go-metrics v0.3.9 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -131,7 +131,7 @@ require ( github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/fatih/color v1.9.0 // indirect + github.com/fatih/color v1.13.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -155,13 +155,13 @@ require ( github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/klauspost/compress v1.16.3 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/fs v0.1.0 // indirect - github.com/mattn/go-colorable v0.1.8 // indirect + github.com/mattn/go-colorable v0.1.9 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect @@ -173,7 +173,7 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.etcd.io/bbolt v1.3.9 // indirect @@ -190,8 +190,8 @@ require ( go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.23.0 - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 golang.org/x/time v0.1.0 // indirect google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect diff --git a/node/fasthttp-client/client.go b/node/fasthttp-client/client.go index fb57be0a..590b50d0 100644 --- a/node/fasthttp-client/client.go +++ b/node/fasthttp-client/client.go @@ -12,7 +12,6 @@ import ( "time" "github.com/eolinker/eosc/eocontext" - "github.com/valyala/fasthttp" ) @@ -148,6 +147,7 @@ func (c *Client) getHostClient(addr string, rewriteHost string) (*fasthttp.HostC return false }, } + //http2.ConfigureClient(hc, http2.ClientOpts{}) m[key] = hc if len(m) == 1 { go c.startCleaner(m) diff --git a/node/http-context/context.go b/node/http-context/context.go index ba570101..9494f592 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -49,8 +49,11 @@ type HttpContext struct { } func (ctx *HttpContext) BodyFinish() { - for _, finishFunc := range ctx.bodyFinishes { - finishFunc(ctx) + bodyFinishes := ctx.bodyFinishes + size := len(bodyFinishes) + // ##倒序执行 + for i := size - 1; i >= 0; i-- { + bodyFinishes[i](ctx) } } @@ -219,10 +222,12 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti // 流式传输 ctx.response.Response.SetStatusCode(response.StatusCode()) ctx.SetLabel("stream_running", "true") + ctx.response.Response.SetBodyStreamWriter(func(w *bufio.Writer) { defer func() { ctx.SetLabel("stream_running", "false") ctx.FastFinish() + fasthttp.ReleaseResponse(response) }() reader := response.BodyStream() buffer := make([]byte, 4096) // 4KB 缓冲区 @@ -262,11 +267,11 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti ctx.proxyRequests = append(ctx.proxyRequests, agent) return nil } - response.CopyTo(ctx.response.Response) agent.responseBody.Write(ctx.response.Response.Body()) agent.setResponseLength(ctx.response.Response.Header.ContentLength()) ctx.proxyRequests = append(ctx.proxyRequests, agent) + fasthttp.ReleaseResponse(response) return ctx.response.responseError } @@ -383,15 +388,14 @@ func (ctx *HttpContext) RequestId() string { // FastFinish finish func (ctx *HttpContext) FastFinish() { - streamRunning := ctx.GetLabel("stream_running") - if streamRunning == "true" { + if ctx.GetLabel("stream_running") == "true" || ctx.GetLabel("current_running") == "true" { // 暂时不释放 return } + if ctx.response.responseError != nil { ctx.fastHttpRequestCtx.SetStatusCode(504) ctx.fastHttpRequestCtx.SetBodyString(ctx.response.responseError.Error()) - return } ctx.port = 0 diff --git a/node/http-context/response.go b/node/http-context/response.go index 3554bbf2..499e3f6a 100644 --- a/node/http-context/response.go +++ b/node/http-context/response.go @@ -77,7 +77,7 @@ func (r *Response) Finish() error { r.Response = nil r.responseError = nil r.proxyStatusCode = 0 - r.streamBody = nil + r.streamBody.Reset() return nil } func (r *Response) reset(resp *fasthttp.Response) { @@ -85,7 +85,10 @@ func (r *Response) reset(resp *fasthttp.Response) { r.ResponseHeader.reset(&resp.Header) r.responseError = nil r.proxyStatusCode = 0 - r.streamBody = &bytes.Buffer{} + if r.streamBody == nil { + r.streamBody = &bytes.Buffer{} + } + r.streamBody.Reset() } func (r *Response) BodyLen() int { @@ -93,15 +96,15 @@ func (r *Response) BodyLen() int { } func (r *Response) GetBody() []byte { + if r.IsBodyStream() { + return r.streamBody.Bytes() + } if strings.Contains(r.GetHeader("Content-Encoding"), "gzip") { body, _ := r.BodyGunzip() r.DelHeader("Content-Encoding") r.SetHeader("Content-Length", strconv.Itoa(len(body))) r.Response.SetBody(body) } - if r.IsBodyStream() { - return r.streamBody.Bytes() - } return r.Response.Body() } @@ -115,6 +118,7 @@ func (r *Response) SetBody(bytes []byte) { // 不处理 return } + if strings.Contains(r.GetHeader("Content-Encoding"), "gzip") { r.DelHeader("Content-Encoding") }