mirror of
https://github.com/eolinker/apinto
synced 2025-11-03 02:53:33 +08:00
Fix: AI provider call failure error not returning
This commit is contained in:
@@ -29,9 +29,8 @@ func (l *accessLog) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.I
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
entry := http_entry.NewEntry(ctx)
|
||||
|
||||
outputs := l.proxy.List()
|
||||
entry := http_entry.NewEntry(ctx)
|
||||
for _, v := range outputs {
|
||||
|
||||
err = v.Output(entry)
|
||||
|
||||
@@ -130,7 +130,12 @@ func (e *executor) DoHttpFilter(ctx http_context.IHttpContext, next eocontext.IC
|
||||
convert.SetAIModel(ctx, e.model)
|
||||
|
||||
if err := e.processKeyPool(ctx, cloneProxy, next); err != nil {
|
||||
return e.doBalance(ctx, cloneProxy, next) // Fallback to balance logic
|
||||
err = e.doBalance(ctx, cloneProxy, next) // Fallback to balance logic
|
||||
if err != nil {
|
||||
ctx.Response().SetBody([]byte(err.Error()))
|
||||
ctx.Response().SetStatus(400, "Bad Request")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -10,11 +10,8 @@ import (
|
||||
type Config struct {
|
||||
Key string `json:"key" label:"格式化Key" required:"true"`
|
||||
Cache eosc.RequireId `json:"cache" label:"缓存计数器" skill:"github.com/eolinker/apinto/resources.resources.ICache" required:"false"`
|
||||
//Counter eosc.RequireId `json:"counter" label:"计数器" skill:"github.com/eolinker/apinto/drivers/counter.counter.IClient" required:"false"`
|
||||
//CountPusher eosc.RequireId `json:"counterPusher" label:"计数推送器" skill:"github.com/eolinker/apinto/drivers/counter.counter.ICountPusher" required:"false"`
|
||||
Match Match `json:"match" label:"响应匹配规则"`
|
||||
Count *separator.CountRule `json:"count" label:"计数规则"`
|
||||
//CountMode string `json:"count_mode" label:"计数模式" enum:"local,redis"`
|
||||
}
|
||||
|
||||
type Match struct {
|
||||
|
||||
@@ -115,6 +115,20 @@ func (e *ExtraParams) access(ctx http_service.IHttpContext) (int, error) {
|
||||
err = encodeErr(e.errorType, err.Error(), clientErrStatusCode)
|
||||
return clientErrStatusCode, err
|
||||
}
|
||||
// 区分Cookie和普通Header
|
||||
if strings.ToLower(param.Name) == "cookie" {
|
||||
cookies := strings.Split(v, ";")
|
||||
for _, cookie := range cookies {
|
||||
cookie = strings.TrimSpace(cookie)
|
||||
if cookie == "" {
|
||||
continue
|
||||
}
|
||||
cookieParam := strings.Split(cookie, "=")
|
||||
ctx.Proxy().Header().SetCookie(cookieParam[0], cookieParam[1], 0)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
ctx.Proxy().Header().SetHeader(param.Name, value)
|
||||
}
|
||||
case "body":
|
||||
|
||||
@@ -3,31 +3,85 @@ package redis
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/eolinker/eosc"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/eolinker/eosc"
|
||||
redis "github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Addrs []string `json:"addrs" label:"redis 节点列表"`
|
||||
MasterName string `json:"master_name" label:"主节点名称"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
DB int `json:"db"`
|
||||
Mode string `json:"mode" enum:"cluster,single" label:"模式"`
|
||||
Scopes []string `json:"scopes" label:"资源组"`
|
||||
}
|
||||
|
||||
func (c *Config) connect() (*redis.ClusterClient, error) {
|
||||
func getClient(options *redis.UniversalOptions, mode string) redis.UniversalClient {
|
||||
if options.MasterName != "" {
|
||||
return redis.NewFailoverClient(options.Failover())
|
||||
} else if len(options.Addrs) > 1 || mode == "cluster" {
|
||||
return redis.NewClusterClient(options.Cluster())
|
||||
}
|
||||
simpleClient := redis.NewClient(options.Simple())
|
||||
ctx, cf := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cf()
|
||||
info := simpleClient.Info(ctx, "cluster")
|
||||
if info.Err() != nil || !strings.Contains(info.String(), "cluster_enabled:1") {
|
||||
return simpleClient
|
||||
}
|
||||
|
||||
nodes := simpleClient.ClusterNodes(context.Background())
|
||||
if nodes.Err() != nil {
|
||||
return simpleClient
|
||||
}
|
||||
_ = simpleClient.Close()
|
||||
nodesContent := nodes.String()
|
||||
nodesContent = strings.TrimPrefix(nodesContent, "cluster nodes: ")
|
||||
nodesContent = strings.TrimSpace(nodesContent)
|
||||
lines := strings.SplitN(nodesContent, "\n", -1)
|
||||
nodeAddrs := make([]string, 0, len(lines))
|
||||
for _, line := range lines {
|
||||
nodeAddrs = append(nodeAddrs, readAddr(line))
|
||||
}
|
||||
options.Addrs = nodeAddrs
|
||||
return redis.NewClusterClient(options.Cluster())
|
||||
}
|
||||
|
||||
func (c *Config) connect() (redis.UniversalClient, error) {
|
||||
if len(c.Addrs) == 0 {
|
||||
return nil, fmt.Errorf("addrs:%w", eosc.ErrorRequire)
|
||||
}
|
||||
nc := redis.NewClusterClient(&redis.ClusterOptions{
|
||||
options := &redis.UniversalOptions{
|
||||
Addrs: c.Addrs,
|
||||
MasterName: c.MasterName,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
})
|
||||
timeout, _ := context.WithTimeout(context.Background(), time.Second)
|
||||
if err := nc.Ping(timeout).Err(); err != nil {
|
||||
nc.Close()
|
||||
DB: c.DB,
|
||||
}
|
||||
client := getClient(options, c.Mode)
|
||||
if client == nil {
|
||||
return nil, fmt.Errorf("get client error")
|
||||
}
|
||||
err := client.Ping(context.Background()).Err()
|
||||
if err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
return nc, nil
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func readAddr(line string) string {
|
||||
fields := strings.Fields(line)
|
||||
addr := fields[1]
|
||||
|
||||
index := strings.Index(addr, "@")
|
||||
if index > 0 {
|
||||
addr = addr[:index]
|
||||
}
|
||||
return addr
|
||||
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"github.com/eolinker/apinto/resources"
|
||||
scope_manager "github.com/eolinker/apinto/scope-manager"
|
||||
"github.com/eolinker/eosc"
|
||||
"github.com/go-redis/redis/v8"
|
||||
redis "github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -24,7 +24,7 @@ type Worker struct {
|
||||
resources.ICache
|
||||
resources.IVectors
|
||||
config *Config
|
||||
client *redis.ClusterClient
|
||||
client redis.UniversalClient
|
||||
|
||||
isRunning bool
|
||||
}
|
||||
|
||||
1
drivers/router/http-router/stream/complete.go
Normal file
1
drivers/router/http-router/stream/complete.go
Normal file
@@ -0,0 +1 @@
|
||||
package stream
|
||||
2
go.mod
2
go.mod
@@ -11,7 +11,7 @@ require (
|
||||
github.com/clbanning/mxj v1.8.4
|
||||
github.com/coocood/freecache v1.2.2
|
||||
github.com/dubbogo/gost v1.13.1
|
||||
github.com/eolinker/eosc v0.19.1
|
||||
github.com/eolinker/eosc v0.19.2
|
||||
github.com/fasthttp/websocket v1.5.0
|
||||
github.com/fullstorydev/grpcurl v1.8.7
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
|
||||
@@ -140,7 +140,7 @@ func (c *Client) getHostClient(addr string, rewriteHost string) (*fasthttp.HostC
|
||||
TLSConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
|
||||
StreamResponseBody: true,
|
||||
Dial: dial,
|
||||
MaxConns: DefaultMaxConns,
|
||||
MaxConnWaitTimeout: DefaultMaxConnWaitTimeout,
|
||||
@@ -200,7 +200,6 @@ func (c *Client) ProxyTimeout(addr string, host string, req *fasthttp.Request, r
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
request.URI().SetScheme(scheme)
|
||||
return client.DoTimeout(req, resp, timeout)
|
||||
|
||||
|
||||
@@ -183,7 +183,7 @@ func (ctx *cloneContext) SendTo(scheme string, node eoscContext.INode, timeout t
|
||||
ctx.response.remoteIP = ip
|
||||
ctx.response.remotePort = port
|
||||
}
|
||||
agent.responseBody = string(ctx.response.Response.Body())
|
||||
agent.responseBody.Write(ctx.response.Response.Body())
|
||||
|
||||
agent.setResponseLength(ctx.response.Response.Header.ContentLength())
|
||||
|
||||
|
||||
@@ -182,33 +182,40 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
|
||||
}
|
||||
|
||||
beginTime := time.Now()
|
||||
ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, rewriteHost, node, request, &ctx.fastHttpRequestCtx.Response, timeout)
|
||||
var responseHeader fasthttp.ResponseHeader
|
||||
if ctx.response.Response != nil {
|
||||
responseHeader = ctx.response.Response.Header
|
||||
}
|
||||
response := fasthttp.AcquireResponse()
|
||||
ctx.response.responseError = fasthttp_client.ProxyTimeout(scheme, rewriteHost, node, request, response, timeout)
|
||||
|
||||
agent := newRequestAgent(&ctx.proxyRequest, host, scheme, responseHeader, beginTime, time.Now())
|
||||
agent := newRequestAgent(&ctx.proxyRequest, host, scheme, response.Header, beginTime, time.Now())
|
||||
|
||||
if ctx.response.responseError != nil {
|
||||
agent.setStatusCode(504)
|
||||
} else {
|
||||
ctx.response.ResponseHeader.refresh()
|
||||
|
||||
agent.setStatusCode(ctx.fastHttpRequestCtx.Response.StatusCode())
|
||||
agent.setStatusCode(response.StatusCode())
|
||||
}
|
||||
|
||||
if ctx.fastHttpRequestCtx.Response.RemoteAddr() != nil {
|
||||
ip, port := parseAddr(ctx.fastHttpRequestCtx.Response.RemoteAddr().String())
|
||||
if response.RemoteAddr() != nil {
|
||||
ip, port := parseAddr(response.RemoteAddr().String())
|
||||
agent.setRemoteIP(ip)
|
||||
agent.setRemotePort(port)
|
||||
ctx.response.remoteIP = ip
|
||||
ctx.response.remotePort = port
|
||||
}
|
||||
agent.responseBody = string(ctx.response.Response.Body())
|
||||
|
||||
agent.setResponseLength(ctx.fastHttpRequestCtx.Response.Header.ContentLength())
|
||||
if response.IsBodyStream() {
|
||||
response.Header.CopyTo(&ctx.response.Response.Header)
|
||||
ctx.response.Response.SetStatusCode(response.StatusCode())
|
||||
reader := &Reader{requestId: ctx.requestID, reader: response.BodyStream(), agent: agent, resp: &ctx.response}
|
||||
ctx.response.Response.SetBodyStream(reader, -1)
|
||||
agent.setResponseLength(ctx.response.Response.Header.ContentLength())
|
||||
ctx.proxyRequests = append(ctx.proxyRequests, agent)
|
||||
return nil
|
||||
}
|
||||
|
||||
response.CopyTo(ctx.response.Response)
|
||||
agent.responseBody.Write(ctx.response.Response.Body())
|
||||
agent.setResponseLength(ctx.response.Response.Header.ContentLength())
|
||||
ctx.proxyRequests = append(ctx.proxyRequests, agent)
|
||||
return ctx.response.responseError
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package http_context
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
@@ -199,3 +200,11 @@ func (r *ResponseHeader) DelHeader(key string) {
|
||||
func (h *RequestHeader) GetCookie(key string) string {
|
||||
return string(h.header.Cookie(key))
|
||||
}
|
||||
|
||||
func (h *RequestHeader) SetCookie(key, value string, maxAge int) {
|
||||
h.header.SetCookie(key, value)
|
||||
|
||||
if maxAge > 0 {
|
||||
h.header.SetCookie(key, fmt.Sprintf("%s; Max-Age=%d", value, maxAge))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/valyala/fasthttp"
|
||||
@@ -20,7 +21,7 @@ type requestAgent struct {
|
||||
statusCode int
|
||||
status string
|
||||
responseLength int
|
||||
responseBody string
|
||||
responseBody strings.Builder
|
||||
beginTime time.Time
|
||||
endTime time.Time
|
||||
hostAgent *UrlAgent
|
||||
@@ -31,7 +32,7 @@ type requestAgent struct {
|
||||
}
|
||||
|
||||
func (a *requestAgent) ResponseBody() string {
|
||||
return a.responseBody
|
||||
return a.responseBody.String()
|
||||
}
|
||||
|
||||
func (a *requestAgent) ResponseHeaders() http.Header {
|
||||
|
||||
34
node/http-context/reader.go
Normal file
34
node/http-context/reader.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package http_context
|
||||
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/eolinker/eosc/log"
|
||||
)
|
||||
|
||||
type Reader struct {
|
||||
reader io.Reader
|
||||
|
||||
agent *requestAgent
|
||||
record strings.Builder
|
||||
requestId string
|
||||
resp *Response
|
||||
}
|
||||
|
||||
func (r *Reader) Read(p []byte) (int, error) {
|
||||
n, err := r.reader.Read(p)
|
||||
if err != nil {
|
||||
log.Debug("read error:", err)
|
||||
log.DebugF("request id %s ,read body: %s", r.requestId, r.record.String())
|
||||
return 0, err
|
||||
}
|
||||
r.record.Write(p[:n])
|
||||
if r.agent != nil {
|
||||
r.agent.responseBody.Write(p[:n])
|
||||
}
|
||||
if r.resp != nil {
|
||||
r.resp.AppendBody(p[:n])
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package http_context
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -21,6 +22,11 @@ type Response struct {
|
||||
responseError error
|
||||
remoteIP string
|
||||
remotePort int
|
||||
bodyStream bytes.Buffer
|
||||
}
|
||||
|
||||
func (r *Response) AppendBody(body []byte) {
|
||||
r.bodyStream.Write(body)
|
||||
}
|
||||
|
||||
func (r *Response) ContentLength() int {
|
||||
@@ -57,7 +63,6 @@ func (r *Response) reset(resp *fasthttp.Response) {
|
||||
r.ResponseHeader.reset(&resp.Header)
|
||||
r.responseError = nil
|
||||
r.proxyStatusCode = 0
|
||||
|
||||
}
|
||||
|
||||
func (r *Response) BodyLen() int {
|
||||
@@ -71,11 +76,18 @@ func (r *Response) GetBody() []byte {
|
||||
r.SetHeader("Content-Length", strconv.Itoa(len(body)))
|
||||
r.Response.SetBody(body)
|
||||
}
|
||||
|
||||
if r.Response.IsBodyStream() {
|
||||
return r.bodyStream.Bytes()
|
||||
}
|
||||
return r.Response.Body()
|
||||
}
|
||||
|
||||
func (r *Response) SetBody(bytes []byte) {
|
||||
if r.Response.IsBodyStream() {
|
||||
// 不能设置body
|
||||
|
||||
return
|
||||
}
|
||||
if strings.Contains(r.GetHeader("Content-Encoding"), "gzip") {
|
||||
r.DelHeader("Content-Encoding")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user