1. 修复自动重定向插件bug

2. loki新增输出字段
3. influxdb新增输出字段
This commit is contained in:
Liujian
2025-05-06 17:48:24 +08:00
parent a9be828021
commit 2a0e090f44
9 changed files with 116 additions and 29 deletions

View File

@@ -61,8 +61,12 @@ type Error struct {
Code string `json:"code"`
}
func getTokens(text string) int {
tkm, _ := tiktoken.GetEncoding("cl100k_base") // 使用 OpenAI 的分词模型
func getTokens(text string, model string) int {
tkm, err := tiktoken.EncodingForModel(model)
if err != nil {
tkm, _ = tiktoken.GetEncoding("cl100k_base") // 使用 OpenAI 的分词模型
}
tokens := tkm.Encode(text, nil, nil)
return len(tokens)
}

View File

@@ -78,7 +78,7 @@ func (o *OpenAIConvert) RequestConvert(ctx eoscContext.EoContext, extender map[s
chatRequest.Config.Model = GetAIModel(ctx)
}
for _, msg := range chatRequest.Config.Messages {
promptToken += getTokens(msg.Content)
promptToken += getTokens(msg.Content, chatRequest.Config.Model)
}
SetAIModelInputToken(httpContext, promptToken)
httpContext.Proxy().AppendStreamBodyHandle(o.streamHandler)
@@ -232,7 +232,7 @@ func (o *OpenAIConvert) streamHandler(ctx http_service.IHttpContext, p []byte) (
return p, nil
}
if len(resp.Choices) > 0 {
outputToken += getTokens(resp.Choices[0].Delta.Content)
outputToken += getTokens(resp.Choices[0].Delta.Content, resp.Model)
totalToken += outputToken
}
if resp.Usage != nil {

View File

@@ -160,6 +160,7 @@ func (o *Output) doLoop() {
return
}
data, _ := json.Marshal(entry)
log.Infof("send data to loki: %s", string(data))
req, err := o.genRequest(data)
if err != nil {
log.Errorf("gen request error: %v,data is %s", err, string(data))

View File

@@ -52,7 +52,9 @@ func (r *handler) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.ICh
err := next.DoChain(ctx)
if err != nil {
log.Error(err)
return err
if !r.autoRedirect || !fasthttp.StatusCodeIsRedirect(ctx.Response().StatusCode()) {
return err
}
}
}
if !r.autoRedirect {
@@ -73,6 +75,9 @@ func (r *handler) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.ICh
return err
}
}
if !fasthttp.StatusCodeIsRedirect(ctx.Response().StatusCode()) {
return nil
}
return fmt.Errorf("too many redirects")
}
@@ -96,7 +101,10 @@ func redirect(ctx http_service.IHttpContext) error {
ctx.SetBalance(balanceHandler)
}
ctx.Proxy().URI().SetPath(u.Path)
ctx.Proxy().URI().SetRawQuery(u.Query().Encode())
for k, v := range u.Query() {
ctx.Proxy().URI().SetQuery(k, v[0])
}
//ctx.Proxy().URI().SetRawQuery(u.Query().Encode())
//ctx.Proxy().URI().SetPath(u.RawPath)
err = ctx.GetComplete().Complete(ctx)

View File

@@ -21,7 +21,15 @@ func (l *worker) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err e
return http_service.DoHttpFilter(l, ctx, next)
}
func (l *worker) bodyFinish(ctx http_service.IHttpContext) {
points := monitor_entry.ReadProxy(ctx)
points = append(points, monitor_entry.ReadRequest(ctx)...)
monitorManager.Output(l.Id(), points)
return
}
func (l *worker) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) {
ctx.Proxy().AppendBodyFinish(l.bodyFinish)
log.Debug("start monitor...")
apiID := ctx.GetLabel("api_id")
monitorManager.ConcurrencyAdd(apiID, 1)
@@ -29,9 +37,11 @@ func (l *worker) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.ICha
if err != nil {
log.Error(err)
}
points := monitor_entry.ReadProxy(ctx)
points = append(points, monitor_entry.ReadRequest(ctx)...)
monitorManager.Output(l.Id(), points)
if ctx.Response().IsBodyStream() {
return nil
}
l.bodyFinish(ctx)
return nil
}

View File

@@ -274,7 +274,21 @@ var (
}),
"body": Fields{
"": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {
return string(ctx.Response().GetBody()), true
body := string(ctx.Response().GetBody())
encoding := ctx.Response().Headers().Get("Content-Encoding")
if encoding == "gzip" {
reader, err := gzip.NewReader(bytes.NewReader([]byte(body)))
if err != nil {
return "", false
}
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
return "", false
}
return string(data), true
}
return body, true
}),
},
"header": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) {

View File

@@ -1,17 +1,43 @@
package monitor_entry
import http_context "github.com/eolinker/eosc/eocontext/http-context"
var (
LabelApi = "api"
LabelApp = "app"
LabelUpstream = "upstream"
LabelHandler = "handler"
LabelProvider = "provider"
LabelApi = "api"
LabelApp = "app"
LabelUpstream = "upstream"
LabelHandler = "handler"
LabelProvider = "provider"
LabelAPIKind = "api_kind"
LabelStatusCode = "status_code"
)
var labels = map[string]string{
LabelApi: "api",
LabelApp: "application",
LabelHandler: "handler",
LabelUpstream: "service",
LabelProvider: "provider",
LabelApi: "api",
LabelApp: "application",
LabelHandler: "handler",
LabelUpstream: "service",
LabelProvider: "provider",
LabelAPIKind: "api_kind",
LabelStatusCode: "status_code",
}
type GetLabel func(ctx http_context.IHttpContext) string
var readLabel = map[string]GetLabel{
"status_code": func(ctx http_context.IHttpContext) string {
statusCode := ctx.Response().StatusCode()
switch {
case statusCode >= 200 && statusCode < 300:
return "2xx"
case statusCode >= 300 && statusCode < 400:
return "3xx"
case statusCode >= 400 && statusCode < 500:
return "4xx"
case statusCode >= 500 && statusCode < 600:
return "5xx"
default:
return "other"
}
},
}

View File

@@ -20,6 +20,9 @@ var requestFields = []string{
"retry",
"timing",
"status",
"input_token",
"output_token",
"total_token",
}
type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool)
@@ -31,6 +34,10 @@ func ReadRequest(ctx http_context.IHttpContext) []IPoint {
}
for key, label := range labels {
if v, ok := readLabel[key]; ok {
tags[label] = v(ctx)
continue
}
value := ctx.GetLabel(label)
if value == "" {
value = "-"
@@ -73,23 +80,39 @@ var request = map[string]RequestReadFunc{
"method": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.Request().Method(), true
},
//"path": func(ctx http_context.IHttpContext) (interface{}, bool) {
// return ctx.Request().URI().Path(), true
//},
//"ip": func(ctx http_context.IHttpContext) (interface{}, bool) {
// return ctx.GetLabel("ip"), true
//},
"status": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.Response().StatusCode(), true
},
"timing": func(ctx http_context.IHttpContext) (interface{}, bool) {
return time.Now().Sub(ctx.AcceptTime()).Milliseconds(), true
},
"input_token": func(ctx http_context.IHttpContext) (interface{}, bool) {
value := ctx.Value("ai_model_input_token")
if v, ok := value.(int); ok {
return v, true
}
return 0, true
},
"output_token": func(ctx http_context.IHttpContext) (interface{}, bool) {
value := ctx.Value("ai_model_output_token")
if v, ok := value.(int); ok {
return v, true
}
return 0, true
},
"total_token": func(ctx http_context.IHttpContext) (interface{}, bool) {
value := ctx.Value("ai_model_total_token")
if v, ok := value.(int); ok {
return v, true
}
return 0, true
},
"request": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.Request().ContentLength(), true
},
"response": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.Response().ContentLength(), true
return len(ctx.Response().GetBody()), true
},
"retry": func(ctx http_context.IHttpContext) (interface{}, bool) {
length := len(ctx.Proxies())

View File

@@ -2,11 +2,12 @@ package response
import (
"fmt"
"net/http"
http_entry "github.com/eolinker/apinto/entries/http-entry"
"github.com/eolinker/eosc/eocontext"
http_context "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/metrics"
"net/http"
)
type IResponse interface {
@@ -17,7 +18,7 @@ type Response struct {
StatusCode int `json:"status_code" label:"HTTP状态码"`
ContentType string `json:"content_type" label:"Content-Type"`
Charset string `json:"charset" label:"Charset"`
Headers []Header `json:"headers" label:"Header参数"` //key:value
Headers []Header `json:"header" label:"Header参数"` //key:value
Body string `json:"body" label:"Body" description:"body模版, 支持 #{label} 语法"`
}