diff --git a/app/apinto/plugin.go b/app/apinto/plugin.go index d3618d34..d8e21df1 100644 --- a/app/apinto/plugin.go +++ b/app/apinto/plugin.go @@ -3,6 +3,7 @@ package main import ( "github.com/eolinker/apinto/drivers/plugins/app" "github.com/eolinker/apinto/drivers/plugins/cors" + data_transform "github.com/eolinker/apinto/drivers/plugins/data-transform" dubbo2_proxy_rewrite "github.com/eolinker/apinto/drivers/plugins/dubbo2-proxy-rewrite" extra_params "github.com/eolinker/apinto/drivers/plugins/extra-params" grpc_proxy_rewrite "github.com/eolinker/apinto/drivers/plugins/grpc-proxy-rewrite" @@ -66,6 +67,7 @@ func pluginRegister(extenderRegister eosc.IExtenderDriverRegister) { proxy_rewrite_v2.Register(extenderRegister) http_mocking.Register(extenderRegister) params_check.Register(extenderRegister) + data_transform.Register(extenderRegister) // 响应处理插件 response_rewrite.Register(extenderRegister) diff --git a/drivers/counter/counter.go b/drivers/counter/counter.go index 2bd467ef..478c0570 100644 --- a/drivers/counter/counter.go +++ b/drivers/counter/counter.go @@ -4,6 +4,8 @@ import ( "fmt" "reflect" + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/utils/config" ) @@ -12,7 +14,7 @@ var ( ) type IClient interface { - Get(key string) (int64, error) + Get(variables eosc.Untyped[string, string]) (int64, error) } type ICounter interface { @@ -24,8 +26,8 @@ type ICounter interface { RollBack(count int64) error } -func GetRemainCount(client IClient, key string, count int64) (int64, error) { - remain, err := client.Get(key) +func GetRemainCount(client IClient, key string, count int64, variables eosc.Untyped[string, string]) (int64, error) { + remain, err := client.Get(variables) if err != nil { return 0, err } diff --git a/drivers/counter/http/executor.go b/drivers/counter/http/executor.go index 51960e61..49f32dc9 100644 --- a/drivers/counter/http/executor.go +++ b/drivers/counter/http/executor.go @@ -1,18 +1,19 @@ package http import ( - "bytes" "encoding/json" "fmt" "net/url" + "time" - scope_manager "github.com/eolinker/apinto/scope-manager" + "github.com/ohler55/ojg/oj" "github.com/ohler55/ojg/jp" + scope_manager "github.com/eolinker/apinto/scope-manager" + "github.com/eolinker/apinto/drivers" "github.com/eolinker/apinto/drivers/counter" - "github.com/ohler55/ojg/oj" "github.com/valyala/fasthttp" "github.com/eolinker/eosc" @@ -23,8 +24,12 @@ var _ eosc.IWorker = (*Executor)(nil) type Executor struct { drivers.WorkerBase - request *fasthttp.Request - expr jp.Expr + req *fasthttp.Request + contentType string + query map[string]string + header map[string]string + body map[string]string + expr jp.Expr } func (b *Executor) Start() error { @@ -48,32 +53,20 @@ func (b *Executor) reset(conf *Config) error { request := fasthttp.AcquireRequest() request.SetRequestURI(conf.URI) request.Header.SetMethod(conf.Method) - for key, value := range conf.Headers { - request.Header.Set(key, value) - } - for key, value := range conf.QueryParam { - request.URI().QueryArgs().Set(key, value) - } - if conf.ContentType == "json" { - request.Header.SetContentType("application/json") - body, _ := json.Marshal(conf.BodyParam) - request.SetBody(body) - } else { - request.Header.SetContentType("application/x-www-form-urlencoded") - bodyParams := url.Values{} - for key, value := range conf.BodyParam { - bodyParams.Set(key, value) - } - request.SetBodyString(bodyParams.Encode()) - } - b.request = request + + b.contentType = conf.ContentType + b.header = conf.Headers + b.query = conf.QueryParam + b.body = conf.BodyParam + + b.req = request b.expr = expr scope_manager.Set(b.Id(), b, conf.Scopes...) return nil } func (b *Executor) Stop() error { - fasthttp.ReleaseRequest(b.request) + fasthttp.ReleaseRequest(b.req) scope_manager.Del(b.Id()) return nil } @@ -86,21 +79,58 @@ var httpClient = fasthttp.Client{ Name: "apinto-counter", } -func (b *Executor) Get(key string) (int64, error) { +func (b *Executor) Get(variables eosc.Untyped[string, string]) (int64, error) { req := fasthttp.AcquireRequest() - b.request.CopyTo(req) + b.req.CopyTo(req) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + for key, value := range b.header { + v, ok := variables.Get(value) + if !ok { + v = value + } + req.Header.Set(key, v) + } + for key, value := range b.query { + v, ok := variables.Get(value) + if !ok { + v = value + } + req.URI().QueryArgs().Set(key, v) + } - req.URI().SetQueryStringBytes(bytes.Replace(req.URI().QueryString(), []byte("$key"), []byte(key), -1)) - req.SetBody(bytes.Replace(req.Body(), []byte("$key"), []byte(key), -1)) - - err := httpClient.Do(req, resp) + var body []byte + switch b.contentType { + case "json": + for key, value := range b.body { + v, ok := variables.Get(value) + if !ok { + v = value + } + b.body[key] = v + } + body, _ = json.Marshal(b.body) + req.Header.SetContentType("application/json") + case "form-data": + params := url.Values{} + for key, value := range b.body { + v, ok := variables.Get(value) + if !ok { + v = value + } + params.Add(key, v) + } + body = []byte(params.Encode()) + req.Header.SetContentType("application/x-www-form-urlencoded") + } + req.SetBody(body) + err := httpClient.DoTimeout(req, resp, 10*time.Second) if err != nil { return 0, err } if resp.StatusCode() != fasthttp.StatusOK { - return 0, fmt.Errorf("error http status code: %d,key: %s,id: %s", resp.StatusCode(), key, b.Id()) + return 0, fmt.Errorf("http status code is %d", resp.StatusCode()) } result, err := oj.Parse(resp.Body()) if err != nil { @@ -109,10 +139,10 @@ func (b *Executor) Get(key string) (int64, error) { // 解析JSON v := b.expr.Get(result) if v == nil || len(v) < 1 { - return 0, fmt.Errorf("no found key: %s,id: %s", key, b.Id()) + return 0, fmt.Errorf("json path %s not found,id is %d", b.expr.String(), b.Id()) } if len(v) != 1 { - return 0, fmt.Errorf("invalid value: %v,key: %s,id: %s", v, key, b.Id()) + return 0, fmt.Errorf("json path %s found more than one,id is %d", b.expr.String(), b.Id()) } return v[0].(int64), nil } diff --git a/drivers/output/kafka/producer.go b/drivers/output/kafka/producer.go index 8c9ed988..906f54fc 100644 --- a/drivers/output/kafka/producer.go +++ b/drivers/output/kafka/producer.go @@ -2,11 +2,12 @@ package kafka import ( "context" + "sync" + "github.com/Shopify/sarama" "github.com/eolinker/eosc" "github.com/eolinker/eosc/formatter" "github.com/eolinker/eosc/log" - "sync" ) type Producer interface { @@ -88,7 +89,7 @@ func (o *tProducer) output(entry eosc.IEntry) error { msg.Partition = o.conf.Partition } if o.conf.PartitionType == "hash" { - msg.Key = sarama.StringEncoder(entry.Read(o.conf.PartitionKey)) + msg.Key = sarama.StringEncoder(eosc.ReadStringFromEntry(entry, o.conf.PartitionKey)) } o.write(msg) diff --git a/drivers/output/nsq/pool.go b/drivers/output/nsq/pool.go index 9478ef68..cb3db3b9 100644 --- a/drivers/output/nsq/pool.go +++ b/drivers/output/nsq/pool.go @@ -3,10 +3,11 @@ package nsq import ( "context" "fmt" - "github.com/eolinker/eosc/log" - "github.com/nsqio/go-nsq" "sync/atomic" "time" + + "github.com/eolinker/eosc/log" + "github.com/nsqio/go-nsq" ) const ( @@ -29,7 +30,7 @@ type node struct { status int } -//Create +// Create func CreateProducerPool(addrs []string, authSecret string, conf map[string]interface{}) (*producerPool, error) { pool := &producerPool{ @@ -87,7 +88,7 @@ func (p *producerPool) PublishAsync(topic string, body []byte) error { log.Errorf("log output nsqd is invalid. nsqd_addr:%s error:%s", producerNode.producer.String(), err) continue } - break + return } log.Errorf("no available nsqd node. data: %s", fmt.Sprintf("topic:%s data:%s", topic, body)) }(n) @@ -95,7 +96,7 @@ func (p *producerPool) PublishAsync(topic string, body []byte) error { return nil } -//Check 检查节点状态 +// Check 检查节点状态 func (p *producerPool) Check() { ticker := time.NewTicker(time.Second * 30) diff --git a/drivers/plugins/counter/client_test.go b/drivers/plugins/counter/client_test.go index 182a195d..5fc1cb94 100644 --- a/drivers/plugins/counter/client_test.go +++ b/drivers/plugins/counter/client_test.go @@ -6,13 +6,15 @@ import ( "testing" "time" + "github.com/eolinker/eosc" + redis "github.com/go-redis/redis/v8" ) type demoClient struct { } -func (d *demoClient) Get(key string) (int64, error) { +func (d *demoClient) Get(variables eosc.Untyped[string, string]) (int64, error) { return 100, nil } diff --git a/drivers/plugins/counter/executor.go b/drivers/plugins/counter/executor.go index 0f1da966..6f97e1fa 100644 --- a/drivers/plugins/counter/executor.go +++ b/drivers/plugins/counter/executor.go @@ -51,7 +51,7 @@ func (b *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IC key := b.keyGenerate.Key(ctx) ct, has := b.counters.Get(key) if !has { - ct = NewRedisCounter(key, b.cache, b.client) + ct = NewRedisCounter(key, b.keyGenerate.Variables(ctx), b.cache, b.client) b.counters.Set(key, ct) } var count int64 = 1 diff --git a/drivers/plugins/counter/key.go b/drivers/plugins/counter/key.go index 9279691d..807aed96 100644 --- a/drivers/plugins/counter/key.go +++ b/drivers/plugins/counter/key.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/eolinker/eosc" + http_service "github.com/eolinker/eosc/eocontext/http-context" ) @@ -11,6 +13,7 @@ var _ IKeyGenerator = (*keyGenerate)(nil) type IKeyGenerator interface { Key(ctx http_service.IHttpContext) string + Variables(ctx http_service.IHttpContext) eosc.Untyped[string, string] } func newKeyGenerate(key string) *keyGenerate { @@ -40,6 +43,15 @@ type keyGenerate struct { variables []string } +func (k *keyGenerate) Variables(ctx http_service.IHttpContext) eosc.Untyped[string, string] { + variables := eosc.BuildUntyped[string, string]() + entry := ctx.GetEntry() + for _, v := range k.variables { + variables.Set(fmt.Sprintf("$%s", v), eosc.ReadStringFromEntry(entry, v)) + } + return variables +} + func (k *keyGenerate) Key(ctx http_service.IHttpContext) string { variables := make([]interface{}, 0, len(k.variables)) for _, v := range k.variables { diff --git a/drivers/plugins/counter/local.go b/drivers/plugins/counter/local.go index ff697ccc..0f8f59ff 100644 --- a/drivers/plugins/counter/local.go +++ b/drivers/plugins/counter/local.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/eolinker/eosc" + scope_manager "github.com/eolinker/apinto/scope-manager" "github.com/eolinker/apinto/drivers/counter" @@ -14,8 +16,8 @@ import ( var _ counter.ICounter = (*LocalCounter)(nil) -func NewLocalCounter(key string, client scope_manager.IProxyOutput[counter.IClient]) *LocalCounter { - return &LocalCounter{key: key, client: client} +func NewLocalCounter(key string, variables eosc.Untyped[string, string], client scope_manager.IProxyOutput[counter.IClient]) *LocalCounter { + return &LocalCounter{key: key, client: client, variables: variables} } // LocalCounter 本地计数器 @@ -28,6 +30,8 @@ type LocalCounter struct { locker sync.Mutex + variables eosc.Untyped[string, string] + resetTime time.Time client scope_manager.IProxyOutput[counter.IClient] @@ -47,7 +51,7 @@ func (c *LocalCounter) Lock(count int64) error { c.resetTime = now for _, client := range c.client.List() { // 获取最新的次数 - remain, err = counter.GetRemainCount(client, c.key, count) + remain, err = counter.GetRemainCount(client, c.key, count, c.variables) if err != nil { log.Errorf("get remain count error: %s", err.Error()) continue diff --git a/drivers/plugins/counter/redis.go b/drivers/plugins/counter/redis.go index f64ccb5a..73f8eb95 100644 --- a/drivers/plugins/counter/redis.go +++ b/drivers/plugins/counter/redis.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/eolinker/eosc" + scope_manager "github.com/eolinker/apinto/scope-manager" "github.com/eolinker/apinto/resources" @@ -33,19 +35,22 @@ type RedisCounter struct { lockerKey string lockKey string remainKey string + + variables eosc.Untyped[string, string] } -func NewRedisCounter(key string, redis scope_manager.IProxyOutput[resources.ICache], client scope_manager.IProxyOutput[counter.IClient]) *RedisCounter { +func NewRedisCounter(key string, variables eosc.Untyped[string, string], redis scope_manager.IProxyOutput[resources.ICache], client scope_manager.IProxyOutput[counter.IClient]) *RedisCounter { return &RedisCounter{ key: key, redis: redis, client: client, - localCounter: NewLocalCounter(key, client), + localCounter: NewLocalCounter(key, variables, client), ctx: context.Background(), lockerKey: fmt.Sprintf("%s:locker", key), lockKey: fmt.Sprintf("%s:lock", key), remainKey: fmt.Sprintf("%s:remain", key), + variables: variables, } } @@ -110,7 +115,7 @@ func (r *RedisCounter) lock(cache resources.ICache, count int64) error { } lock, _ := strconv.ParseInt(lockCount, 10, 64) for _, client := range r.client.List() { - remain, err = counter.GetRemainCount(client, r.key, count+lock) + remain, err = counter.GetRemainCount(client, r.key, count+lock, r.variables) if err != nil { log.Errorf("get remain count error: %s", err) continue diff --git a/drivers/plugins/data-transform/config.go b/drivers/plugins/data-transform/config.go new file mode 100644 index 00000000..a7a8d85d --- /dev/null +++ b/drivers/plugins/data-transform/config.go @@ -0,0 +1,24 @@ +package data_transform + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +type Config struct { + RequestTransform bool `json:"request_transform" label:"请求转换"` + ResponseTransform bool `json:"response_transform" label:"响应转换"` + XMLRootTag string `json:"xml_root_tag" label:"XML根标签"` + XMLDeclaration map[string]string `json:"xml_declaration" label:"XML声明"` + ErrorType string `json:"error_type" label:"报错数据类型" default:"json" enum:"json,xml"` +} + +func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + + bc := &executor{ + WorkerBase: drivers.Worker(id, name), + conf: conf, + } + + return bc, nil +} diff --git a/drivers/plugins/data-transform/executor.go b/drivers/plugins/data-transform/executor.go new file mode 100644 index 00000000..cd096bf3 --- /dev/null +++ b/drivers/plugins/data-transform/executor.go @@ -0,0 +1,111 @@ +package data_transform + +import ( + "mime" + "net/http" + "strings" + + "github.com/eolinker/eosc/log" + + "github.com/eolinker/apinto/drivers" + + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" +) + +var _ http_service.HttpFilter = (*executor)(nil) +var _ eocontext.IFilter = (*executor)(nil) + +type executor struct { + drivers.WorkerBase + conf *Config +} + +func (b *executor) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) { + return http_service.DoHttpFilter(b, ctx, next) +} + +func (b *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) error { + if b.conf.RequestTransform && (ctx.Proxy().Method() == http.MethodPost || ctx.Proxy().Method() == http.MethodPut || ctx.Proxy().Method() == http.MethodPatch) { + // 对请求体做转换 + body, _ := ctx.Proxy().Body().RawBody() + contentType, _, _ := mime.ParseMediaType(ctx.Request().ContentType()) + if strings.Contains(contentType, "/json") { + result, err := json2xml(body, b.conf.XMLRootTag, b.conf.XMLDeclaration) + if err != nil { + errInfo := "fail to transform request json to xml" + ctx.Response().SetStatus(http.StatusBadRequest, "400") + ctx.Response().SetBody([]byte(encode(b.conf.ErrorType, errInfo, http.StatusBadRequest))) + log.Errorf("%s,body is %s", errInfo, string(body)) + return err + } + ctx.Proxy().Body().SetRaw("application/xml", result) + } else if strings.Contains(contentType, "/xml") { + result, err := xml2json(body, b.conf.XMLDeclaration) + if err != nil { + errInfo := "fail to transform request xml to json" + ctx.Response().SetStatus(http.StatusBadRequest, "400") + ctx.Response().SetBody([]byte(encode(b.conf.ErrorType, errInfo, http.StatusBadRequest))) + log.Errorf("%s,body is %s", errInfo, string(body)) + return err + } + ctx.Proxy().Body().SetRaw("application/json", result) + } + } + err := next.DoChain(ctx) + if err != nil { + return err + } + if b.conf.ResponseTransform { + // 对请求体做转换 + body := ctx.Response().GetBody() + contentType, _, _ := mime.ParseMediaType(ctx.Response().ContentType()) + if strings.Contains(contentType, "/json") { + result, err := json2xml(body, b.conf.XMLRootTag, b.conf.XMLDeclaration) + if err != nil { + errInfo := "fail to transform response json to xml" + ctx.Response().SetStatus(http.StatusBadRequest, "400") + ctx.Response().SetBody([]byte(encode(b.conf.ErrorType, errInfo, http.StatusBadRequest))) + log.Errorf("%s,body is %s", errInfo, string(body)) + return err + } + ctx.Response().SetBody(result) + ctx.Response().Headers().Set("Content-Type", "application/xml") + } else if strings.Contains(contentType, "/xml") { + result, err := xml2json(body, b.conf.XMLDeclaration) + if err != nil { + errInfo := "fail to transform response xml to json" + ctx.Response().SetStatus(http.StatusBadRequest, "400") + ctx.Response().SetBody([]byte(encode(b.conf.ErrorType, errInfo, http.StatusBadRequest))) + log.Errorf("%s,body is %s", errInfo, string(body)) + return err + } + ctx.Response().SetBody(result) + ctx.Response().Headers().Set("Content-Type", "application/json") + } + } + return nil +} + +func (b *executor) Start() error { + return nil +} + +func (b *executor) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + return nil +} + +func (b *executor) Stop() error { + b.Destroy() + return nil +} + +func (b *executor) Destroy() { + b.conf = nil + return +} + +func (b *executor) CheckSkill(skill string) bool { + return http_service.FilterSkillName == skill +} diff --git a/drivers/plugins/data-transform/factory.go b/drivers/plugins/data-transform/factory.go new file mode 100644 index 00000000..ec6026d7 --- /dev/null +++ b/drivers/plugins/data-transform/factory.go @@ -0,0 +1,18 @@ +package data_transform + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +const ( + Name = "data_transform" +) + +func Register(register eosc.IExtenderDriverRegister) { + register.RegisterExtenderDriver(Name, NewFactory()) +} + +func NewFactory() eosc.IExtenderDriverFactory { + return drivers.NewFactory[Config](Create) +} diff --git a/drivers/plugins/data-transform/utils.go b/drivers/plugins/data-transform/utils.go new file mode 100644 index 00000000..35c9c44f --- /dev/null +++ b/drivers/plugins/data-transform/utils.go @@ -0,0 +1,100 @@ +package data_transform + +import ( + "encoding/json" + "fmt" + "strings" + + . "github.com/clbanning/mxj" +) + +var declarations = []string{"version", "encoding", "standalone"} + +func xml2json(xmlVal []byte, xmlDeclaration map[string]string) ([]byte, error) { + m, err := NewMapXml(xmlVal) + if err != nil { + return nil, err + } + for _, key := range declarations { + if _, ok := m[key]; !ok { + if xmlDeclaration != nil { + if v, has := xmlDeclaration[key]; has { + m[key] = v + } + } + } + if v, ok := m[key]; ok { + vv, success := v.(string) + if success { + m[key] = strings.Replace(vv, `"`, "", -1) + } + } + } + + jsonObject, err := m.Json() + if err != nil { + return nil, err + } + return jsonObject, nil +} + +func json2xml(jsonVal []byte, rootTag string, xd map[string]string) ([]byte, error) { + var m, n map[string]interface{} + if err := json.Unmarshal(jsonVal, &m); err != nil { + return nil, err + } + n = make(map[string]interface{}) + for _, key := range declarations { + if v, ok := m[key]; ok { + n[key] = v + delete(m, key) + } else { + if xd != nil { + if v, ok := xd[key]; ok { + n[key] = v + } + } + + } + } + var x []byte + var err error + x, err = Map(m).XmlIndent("", " ", rootTag) + if err != nil { + return x, err + } + + declaration := xmlDeclaration(n) + retBody := []byte(fmt.Sprintf("%s\r\n%s", declaration, string(x))) + return retBody, nil +} + +func xmlDeclaration(m map[string]interface{}) string { + declaration := "" + + for _, a := range declarations { + if declaration == "" { + declaration = "" + } + return declaration +} + +func encode(ent string, origin string, statusCode int) string { + if ent == "json" { + tmp := map[string]interface{}{ + "message": origin, + "status_code": statusCode, + } + body, _ := json.Marshal(tmp) + return string(body) + } + return origin +} diff --git a/drivers/plugins/extra-params_v2/config.go b/drivers/plugins/extra-params_v2/config.go index 3901b312..0a690284 100644 --- a/drivers/plugins/extra-params_v2/config.go +++ b/drivers/plugins/extra-params_v2/config.go @@ -5,6 +5,8 @@ import ( "strconv" "strings" + "github.com/eolinker/eosc" + dynamic_params "github.com/eolinker/apinto/drivers/plugins/extra-params_v2/dynamic-params" http_service "github.com/eolinker/eosc/eocontext/http-context" ) @@ -124,7 +126,7 @@ func (b *paramInfo) Build(ctx http_service.IHttpContext, contentType string, par func (b *paramInfo) build(ctx http_service.IHttpContext, contentType string, params interface{}) (string, error) { if b.driver == nil { if b.systemValue { - return ctx.GetLabel(b.value), nil + return eosc.ReadStringFromEntry(ctx.GetEntry(), b.value), nil } return b.value, nil } diff --git a/drivers/plugins/extra-params_v2/dynamic-params/param.go b/drivers/plugins/extra-params_v2/dynamic-params/param.go index 82b96c94..649cd947 100644 --- a/drivers/plugins/extra-params_v2/dynamic-params/param.go +++ b/drivers/plugins/extra-params_v2/dynamic-params/param.go @@ -6,6 +6,8 @@ import ( "strconv" "strings" + "github.com/eolinker/eosc" + http_service "github.com/eolinker/eosc/eocontext/http-context" "github.com/ohler55/ojg/jp" "github.com/ohler55/ojg/oj" @@ -166,7 +168,7 @@ func retrieveParam(ctx http_service.IHttpContext, contentType string, body inter } } case positionSystem: - return ctx.GetLabel(value.key) + return eosc.ReadStringFromEntry(ctx.GetEntry(), value.key) } return "" } diff --git a/drivers/plugins/extra-params_v2/util.go b/drivers/plugins/extra-params_v2/util.go index deb27a2c..4e0d8f89 100644 --- a/drivers/plugins/extra-params_v2/util.go +++ b/drivers/plugins/extra-params_v2/util.go @@ -21,8 +21,7 @@ const ( var ( paramPositionErrInfo = `[plugin extra-params config err] param position must be in the set ["query","header",body]. err position: %s ` - //parseBodyErrInfo = `[extra_params] Fail to parse body! [err]: %s` - paramNameErrInfo = `[plugin extra-params config err] param name must be not null. ` + paramNameErrInfo = `[plugin extra-params config err] param name must be not null. ` ) func encodeErr(ent string, origin string, statusCode int) error { @@ -36,176 +35,3 @@ func encodeErr(ent string, origin string, statusCode int) error { } return errors.New(origin) } - -//func parseBodyParams(ctx http_service.IHttpContext) (interface{}, url.Values, error) { -// if ctx.Proxy().Method() != http.MethodPost && ctx.Proxy().Method() != http.MethodPut && ctx.Proxy().Method() != http.MethodPatch { -// return nil, nil, nil -// } -// contentType, _, _ := mime.ParseMediaType(ctx.Proxy().Body().ContentType()) -// switch contentType { -// case http_context.FormData, http_context.MultipartForm: -// formParams, err := ctx.Proxy().Body().BodyForm() -// if err != nil { -// return nil, nil, err -// } -// return nil, formParams, nil -// case http_context.JSON: -// body, err := ctx.Proxy().Body().RawBody() -// if err != nil { -// return nil, nil, err -// } -// if string(body) == "" { -// body = []byte("{}") -// } -// bodyParams, err := oj.Parse(body) -// return bodyParams, nil, err -// } -// return nil, nil, errors.New("unsupported content-type: " + contentType) -//} - -// -//func parseBodyParams(ctx http_service.IHttpContext) (map[string]interface{}, map[string][]string, error) { -// contentType, _, _ := mime.ParseMediaType(ctx.Proxy().Body().ContentType()) -// -// switch contentType { -// case http_context.FormData, http_context.MultipartForm: -// formParams, err := ctx.Proxy().Body().BodyForm() -// if err != nil { -// return nil, nil, err -// } -// return nil, formParams, nil -// case http_context.JSON: -// body, err := ctx.Proxy().Body().RawBody() -// if err != nil { -// return nil, nil, err -// } -// var bodyParams map[string]interface{} -// err = json.Unmarshal(body, &bodyParams) -// if err != nil { -// return bodyParams, nil, err -// } -// } -// return nil, nil, errors.New("[params_transformer] unsupported content-type: " + contentType) -//} - -//func getHeaderValue(headers map[string][]string, param *ExtraParam, value string) (string, error) { -// paramName := ConvertHeaderKey(param.Name) -// -// if param.Conflict == "" { -// param.Conflict = paramConvert -// } -// -// var paramValue string -// -// if _, ok := headers[paramName]; !ok { -// param.Conflict = paramConvert -// } else { -// paramValue = headers[paramName][0] -// } -// -// if param.Conflict == paramConvert { -// paramValue = value -// } else if param.Conflict == paramError { -// errInfo := `[extra_params] "` + param.Name + `" has a conflict.` -// return "", errors.New(errInfo) -// } -// -// return paramValue, nil -//} - -//func hasQueryValue(rawQuery string, paramName string) bool { -// bytes := []byte(rawQuery) -// if len(bytes) == 0 { -// return false -// } -// -// k := 0 -// for i, c := range bytes { -// switch c { -// case '=': -// key := string(bytes[k:i]) -// if key == paramName { -// return true -// } -// case '&': -// k = i + 1 -// } -// } -// -// return false -//} - -//func getQueryValue(ctx http_service.IHttpContext, param *ExtraParam, value string) (string, error) { -// paramValue := "" -// if param.Conflict == "" { -// param.Conflict = paramConvert -// } -// -// //判断请求中是否包含对应的query参数 -// if !hasQueryValue(ctx.Proxy().URI().RawQuery(), param.Name) { -// param.Conflict = paramConvert -// } else { -// paramValue = ctx.Proxy().URI().GetQuery(param.Name) -// } -// -// if param.Conflict == paramConvert { -// paramValue = value -// } else if param.Conflict == paramError { -// errInfo := `[extra_params] "` + param.Name + `" has a conflict.` -// return "", errors.New(errInfo) -// } -// -// return paramValue, nil -//} -// -//func getBodyValue(bodyParams map[string]interface{}, formParams map[string][]string, param *ExtraParam, contentType string, value interface{}) (interface{}, error) { -// var paramValue interface{} = nil -// Conflict := param.Conflict -// if Conflict == "" { -// Conflict = paramConvert -// } -// if strings.Contains(contentType, http_context.FormData) || strings.Contains(contentType, http_context.MultipartForm) { -// if _, ok := formParams[param.Name]; !ok { -// Conflict = paramConvert -// } else { -// paramValue = formParams[param.Name][0] -// } -// } else if strings.Contains(contentType, http_context.JSON) { -// if _, ok := bodyParams[param.Name]; !ok { -// param.Conflict = paramConvert -// } else { -// paramValue = bodyParams[param.Name] -// } -// } -// if Conflict == paramConvert { -// paramValue = value -// } else if Conflict == paramError { -// errInfo := `[extra_params] "` + param.Name + `" has a conflict.` -// return "", errors.New(errInfo) -// } -// -// return paramValue, nil -//} - -//func ConvertHeaderKey(header string) string { -// header = strings.ToLower(header) -// headerArray := strings.Split(header, "-") -// h := "" -// arrLen := len(headerArray) -// for i, value := range headerArray { -// vLen := len(value) -// if vLen < 1 { -// continue -// } else { -// if vLen == 1 { -// h += strings.ToUpper(value) -// } else { -// h += strings.ToUpper(string(value[0])) + value[1:] -// } -// if i != arrLen-1 { -// h += "-" -// } -// } -// } -// return h -//} diff --git a/drivers/router/dubbo2-router/config.go b/drivers/router/dubbo2-router/config.go index ea170221..97ea675f 100644 --- a/drivers/router/dubbo2-router/config.go +++ b/drivers/router/dubbo2-router/config.go @@ -8,15 +8,16 @@ import ( type Config struct { Listen int `json:"listen" yaml:"listen" title:"port" description:"使用端口" default:"80" label:"端口号" maximum:"65535"` - ServiceName string `json:"service_name" yaml:"service_name" label:"服务名"` - MethodName string `json:"method_name" yaml:"method_name" label:"方法名"` - Rules []Rule `json:"rules" yaml:"rules" label:"路由规则"` - Service eosc.RequireId `json:"service" yaml:"service" skill:"github.com/eolinker/apinto/service.service.IService" required:"true" label:"目标服务"` - Template eosc.RequireId `json:"template" yaml:"template" skill:"github.com/eolinker/apinto/template.template.ITemplate" required:"false" label:"插件模版"` - Disable bool `json:"disable" yaml:"disable" label:"禁用路由"` - Plugins plugin.Plugins `json:"plugins" yaml:"plugins" label:"插件配置"` - Retry int `json:"retry" label:"重试次数" yaml:"retry"` - TimeOut int `json:"time_out" label:"超时时间"` + ServiceName string `json:"service_name" yaml:"service_name" label:"服务名"` + MethodName string `json:"method_name" yaml:"method_name" label:"方法名"` + Rules []Rule `json:"rules" yaml:"rules" label:"路由规则"` + Service eosc.RequireId `json:"service" yaml:"service" skill:"github.com/eolinker/apinto/service.service.IService" required:"true" label:"目标服务"` + Template eosc.RequireId `json:"template" yaml:"template" skill:"github.com/eolinker/apinto/template.template.ITemplate" required:"false" label:"插件模版"` + Disable bool `json:"disable" yaml:"disable" label:"禁用路由"` + Plugins plugin.Plugins `json:"plugins" yaml:"plugins" label:"插件配置"` + Retry int `json:"retry" label:"重试次数" yaml:"retry"` + TimeOut int `json:"time_out" label:"超时时间"` + Labels map[string]string `json:"labels" label:"路由标签"` } // Rule 规则 diff --git a/drivers/router/dubbo2-router/handler.go b/drivers/router/dubbo2-router/handler.go index 12ff72bf..97a8a77d 100644 --- a/drivers/router/dubbo2-router/handler.go +++ b/drivers/router/dubbo2-router/handler.go @@ -26,6 +26,7 @@ type dubboHandler struct { filters eocontext.IChainPro retry int timeout time.Duration + labels map[string]string } var completeCaller = manager.NewCompleteCaller() @@ -41,6 +42,9 @@ func (d *dubboHandler) ServeHTTP(ctx eocontext.EoContext) { dubboCtx.Response().SetBody(manager.Dubbo2ErrorResult(errors.New("router disable"))) return } + for key, value := range d.labels { + ctx.SetLabel(key, value) + } //set retry timeout ctx.WithValue(ctx_key.CtxKeyRetry, d.retry) diff --git a/drivers/router/grpc-router/config.go b/drivers/router/grpc-router/config.go index 5bfa3bd8..4e756803 100644 --- a/drivers/router/grpc-router/config.go +++ b/drivers/router/grpc-router/config.go @@ -10,15 +10,16 @@ type Config struct { Host []string `json:"host" yaml:"host" label:"域名"` - ServiceName string `json:"service_name" yaml:"service_name" label:"服务名"` - MethodName string `json:"method_name" yaml:"method_name" label:"方法名"` - Rules []Rule `json:"rules" yaml:"rules" label:"路由规则"` - Service eosc.RequireId `json:"service" yaml:"service" skill:"github.com/eolinker/apinto/service.service.IService" required:"true" label:"目标服务"` - Template eosc.RequireId `json:"template" yaml:"template" skill:"github.com/eolinker/apinto/template.template.ITemplate" required:"false" label:"插件模版"` - Disable bool `json:"disable" yaml:"disable" label:"禁用路由"` - Plugins plugin.Plugins `json:"plugins" yaml:"plugins" label:"插件配置"` - Retry int `json:"retry" label:"重试次数" yaml:"retry"` - TimeOut int `json:"time_out" label:"超时时间"` + ServiceName string `json:"service_name" yaml:"service_name" label:"服务名"` + MethodName string `json:"method_name" yaml:"method_name" label:"方法名"` + Rules []Rule `json:"rules" yaml:"rules" label:"路由规则"` + Service eosc.RequireId `json:"service" yaml:"service" skill:"github.com/eolinker/apinto/service.service.IService" required:"true" label:"目标服务"` + Template eosc.RequireId `json:"template" yaml:"template" skill:"github.com/eolinker/apinto/template.template.ITemplate" required:"false" label:"插件模版"` + Disable bool `json:"disable" yaml:"disable" label:"禁用路由"` + Plugins plugin.Plugins `json:"plugins" yaml:"plugins" label:"插件配置"` + Retry int `json:"retry" label:"重试次数" yaml:"retry"` + TimeOut int `json:"time_out" label:"超时时间"` + Labels map[string]string `json:"labels" label:"路由标签"` } // Rule 规则 diff --git a/drivers/router/grpc-router/handler.go b/drivers/router/grpc-router/handler.go index 8648f377..6b2f3394 100644 --- a/drivers/router/grpc-router/handler.go +++ b/drivers/router/grpc-router/handler.go @@ -27,6 +27,7 @@ type grpcRouter struct { filters eocontext.IChainPro disable bool retry int + labels map[string]string timeout time.Duration } @@ -40,6 +41,9 @@ func (h *grpcRouter) ServeHTTP(ctx eocontext.EoContext) { grpcContext.FastFinish() return } + for key, value := range h.labels { + ctx.SetLabel(key, value) + } //set retry timeout ctx.WithValue(ctx_key.CtxKeyRetry, h.retry) diff --git a/drivers/router/grpc-router/router.go b/drivers/router/grpc-router/router.go index 55682582..e98727a6 100644 --- a/drivers/router/grpc-router/router.go +++ b/drivers/router/grpc-router/router.go @@ -58,6 +58,7 @@ func (h *GrpcRouter) reset(cfg *Config, workers map[eosc.RequireId]eosc.IWorker) retry: cfg.Retry, timeout: time.Duration(cfg.TimeOut) * time.Millisecond, disable: cfg.Disable, + labels: cfg.Labels, } if !cfg.Disable { diff --git a/drivers/router/http-router/config.go b/drivers/router/http-router/config.go index 7441ab64..cfb8e0b2 100644 --- a/drivers/router/http-router/config.go +++ b/drivers/router/http-router/config.go @@ -22,8 +22,9 @@ type Config struct { Disable bool `json:"disable" yaml:"disable" label:"禁用路由"` Plugins plugin.Plugins `json:"plugins" yaml:"plugins" label:"插件配置"` - Retry int `json:"retry" label:"重试次数" yaml:"retry" switch:"service!==''"` - TimeOut int `json:"time_out" label:"超时时间" switch:"service!==''"` + Retry int `json:"retry" label:"重试次数" yaml:"retry" switch:"service!==''"` + TimeOut int `json:"time_out" label:"超时时间" switch:"service!==''"` + Labels map[string]string `json:"labels" label:"路由标签"` } // Rule 规则 diff --git a/drivers/router/http-router/http-handler.go b/drivers/router/http-router/http-handler.go index abe50faa..9f50eeec 100644 --- a/drivers/router/http-router/http-handler.go +++ b/drivers/router/http-router/http-handler.go @@ -29,9 +29,9 @@ type httpHandler struct { filters eocontext.IChainPro disable bool websocket bool - - retry int - timeout time.Duration + labels map[string]string + retry int + timeout time.Duration } func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) { @@ -55,6 +55,11 @@ func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) { } ctx = wsCtx } + + for key, value := range h.labels { + // 设置标签 + ctx.SetLabel(key, value) + } //set retry timeout ctx.WithValue(ctx_key.CtxKeyRetry, h.retry) ctx.WithValue(ctx_key.CtxKeyTimeout, h.timeout) diff --git a/drivers/router/http-router/router.go b/drivers/router/http-router/router.go index 2618983d..1a72cfff 100644 --- a/drivers/router/http-router/router.go +++ b/drivers/router/http-router/router.go @@ -61,6 +61,7 @@ func (h *HttpRouter) reset(cfg *Config, workers map[eosc.RequireId]eosc.IWorker) disable: cfg.Disable, websocket: cfg.Websocket, retry: cfg.Retry, + labels: cfg.Labels, timeout: time.Duration(cfg.TimeOut) * time.Millisecond, } diff --git a/entries/http-entry/entry.go b/entries/http-entry/entry.go index fc3514f9..06c7e234 100644 --- a/entries/http-entry/entry.go +++ b/entries/http-entry/entry.go @@ -21,7 +21,7 @@ func NewEntry(ctx http_service.IHttpContext) *Entry { return &Entry{ctx: ctx} } -func (e *Entry) Read(pattern string) string { +func (e *Entry) Read(pattern string) interface{} { v, ok := rule.Read(pattern, e.ctx) if !ok { return "" @@ -50,7 +50,7 @@ type ChildEntry struct { childReader IReaderIndex } -func (c *ChildEntry) Read(pattern string) string { +func (c *ChildEntry) Read(pattern string) interface{} { if strings.HasPrefix(pattern, c.pre) { name := strings.TrimPrefix(pattern, c.pre) v, _ := c.childReader.ReadByIndex(c.index, name, c.parent.ctx) diff --git a/entries/http-entry/proxy-reader.go b/entries/http-entry/proxy-reader.go index 2250bde8..30d58525 100644 --- a/entries/http-entry/proxy-reader.go +++ b/entries/http-entry/proxy-reader.go @@ -1,15 +1,44 @@ package http_entry import ( + "strconv" + http_service "github.com/eolinker/eosc/eocontext/http-context" ) type IProxyReader interface { - ReadProxy(name string, proxy http_service.IProxy) (string, bool) + ReadProxy(name string, proxy http_service.IProxy) (interface{}, bool) } -type ProxyReadFunc func(name string, proxy http_service.IProxy) (string, bool) +func ReadProxyFromProxyReader(reader IProxyReader, proxy http_service.IProxy, key string) (string, bool) { + var data string + value, has := reader.ReadProxy(key, proxy) + if !has { + return "", false + } + switch v := value.(type) { + case string: + data = v + case []byte: + data = string(v) + case int: + data = strconv.Itoa(v) + case int64: + data = strconv.FormatInt(v, 10) + case float32: + data = strconv.FormatFloat(float64(v), 'f', -1, 32) + case float64: + data = strconv.FormatFloat(v, 'f', -1, 64) + case bool: + data = strconv.FormatBool(v) + default: + return "", false + } + return data, true +} -func (p ProxyReadFunc) ReadProxy(name string, proxy http_service.IProxy) (string, bool) { +type ProxyReadFunc func(name string, proxy http_service.IProxy) (interface{}, bool) + +func (p ProxyReadFunc) ReadProxy(name string, proxy http_service.IProxy) (interface{}, bool) { return p(name, proxy) } diff --git a/entries/http-entry/reader-index.go b/entries/http-entry/reader-index.go index 74ff2a9f..345232e0 100644 --- a/entries/http-entry/reader-index.go +++ b/entries/http-entry/reader-index.go @@ -7,16 +7,12 @@ import ( ) type IReaderIndex interface { - ReadByIndex(index int, name string, ctx http_service.IHttpContext) (string, bool) + ReadByIndex(index int, name string, ctx http_service.IHttpContext) (interface{}, bool) } type ProxyReaders map[string]IProxyReader -func (p ProxyReaders) ReadByIndex(index int, name string, ctx http_service.IHttpContext) (string, bool) { - v, ok := p[name] - if !ok { - return "", false - } +func (p ProxyReaders) ReadByIndex(index int, name string, ctx http_service.IHttpContext) (interface{}, bool) { proxies := ctx.Proxies() proxyLen := len(proxies) @@ -26,12 +22,22 @@ func (p ProxyReaders) ReadByIndex(index int, name string, ctx http_service.IHttp if index == -1 { index = proxyLen - 1 } - - return v.ReadProxy(name, proxies[index]) + v, ok := p[name] + if !ok { + fs := strings.SplitN(name, "_", 2) + if len(fs) == 2 { + v, ok = p[fs[0]] + if ok { + return v.ReadProxy(fs[1], proxies[index]) + } + } + return "", false + } + return v.ReadProxy("", proxies[index]) } -func (p ProxyReaders) Read(name string, ctx http_service.IHttpContext) (string, bool) { +func (p ProxyReaders) Read(name string, ctx http_service.IHttpContext) (interface{}, bool) { ns := strings.SplitN(name, "_", 2) v, ok := p[ns[0]] if !ok { diff --git a/entries/http-entry/reader.go b/entries/http-entry/reader.go index 067902be..84f405ed 100644 --- a/entries/http-entry/reader.go +++ b/entries/http-entry/reader.go @@ -2,6 +2,7 @@ package http_entry import ( "net/url" + "os" "strconv" "strings" "time" @@ -9,24 +10,22 @@ import ( "github.com/eolinker/apinto/utils/version" "github.com/eolinker/apinto/utils" - eosc_utils "github.com/eolinker/eosc/utils" - http_service "github.com/eolinker/eosc/eocontext/http-context" ) type IReader interface { - Read(name string, ctx http_service.IHttpContext) (string, bool) + Read(name string, ctx http_service.IHttpContext) (interface{}, bool) } -type ReadFunc func(name string, ctx http_service.IHttpContext) (string, bool) +type ReadFunc func(name string, ctx http_service.IHttpContext) (interface{}, bool) -func (f ReadFunc) Read(name string, ctx http_service.IHttpContext) (string, bool) { +func (f ReadFunc) Read(name string, ctx http_service.IHttpContext) (interface{}, bool) { return f(name, ctx) } type Fields map[string]IReader -func (f Fields) Read(name string, ctx http_service.IHttpContext) (string, bool) { +func (f Fields) Read(name string, ctx http_service.IHttpContext) (interface{}, bool) { r, has := f[name] if has { return r.Read("", ctx) @@ -43,191 +42,203 @@ func (f Fields) Read(name string, ctx http_service.IHttpContext) (string, bool) if label != "" { return label, true } + label = os.Getenv(name) - return "", false + return label, label != "" } var ( rule Fields = map[string]IReader{ - "request_id": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "request_id": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.RequestId(), true }), - "node": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { - if value := ctx.GetLabel(name); value != "" { - return value, true - } - globalLabels := eosc_utils.GlobalLabelGet() - return globalLabels["node_id"], true + "node": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return os.Getenv("node_id"), true }), - "cluster": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { - if value := ctx.GetLabel(name); value != "" { - return value, true - } - globalLabels := eosc_utils.GlobalLabelGet() - return globalLabels["cluster_id"], true + "cluster": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return os.Getenv("cluster_id"), true }), - "api_id": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "api_id": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.GetLabel("api_id"), true }), - "query": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "query": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { if name == "" { return utils.QueryUrlEncode(ctx.Request().URI().RawQuery()), true } return url.QueryEscape(ctx.Request().URI().GetQuery(name)), true }), - "uri": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "uri": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { //不带请求参数的uri return ctx.Request().URI().Path(), true }), - "content_length": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "content_length": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().Header().GetHeader("content-length"), true }), - "content_type": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "content_type": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().Header().GetHeader("content-type"), true }), - "cookie": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "cookie": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { if name == "" { return ctx.Request().Header().GetHeader("cookie"), true } return ctx.Request().Header().GetCookie(name), false }), - "msec": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { - return strconv.FormatInt(time.Now().Unix(), 10), true + "msec": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.AcceptTime().UnixMilli(), true }), - "apinto_version": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "apinto_version": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return version.Version, true }), - "remote_addr": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "remote_addr": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().RemoteAddr(), true }), - "remote_port": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "remote_port": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().RemotePort(), true }), "request": Fields{ - "body": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "body": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { body, err := ctx.Request().Body().RawBody() if err != nil { return "", false } return string(body), true }), - "length": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "length": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { - return strconv.Itoa(len(ctx.Request().String())), true + return ctx.Request().ContentLength(), true }), - "method": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "method": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().Method(), true }), - "time": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "time": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { requestTime := ctx.Value("request_time") start, ok := requestTime.(time.Time) if !ok { return "", false } - return strconv.FormatInt(time.Now().Sub(start).Milliseconds(), 10), true + return time.Now().Sub(start).Milliseconds(), true }), - "uri": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "uri": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().URI().RequestURI(), true }), }, - "scheme": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "scheme": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().URI().Scheme(), true }), - "status": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { - return ctx.Response().Status(), true + "status": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Response().StatusCode(), true }), - "time_iso8601": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "time_iso8601": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { //带毫秒的ISO-8601时间格式 - return time.Now().Format("2006-01-02T15:04:05.000Z07:00"), true + //return time.Now().Format("2006-01-02T15:04:05.000Z07:00"), true + return ctx.AcceptTime().Format("2006-01-02T15:04:05.000Z07:00"), true }), - "time_local": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { - return time.Now().Format("2006-01-02 15:04:05"), true + "time_local": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + //return time.Now().Format("2006-01-02 15:04:05"), true + return ctx.AcceptTime().Format("2006-01-02 15:04:05"), true }), - "header": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "header": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { if name == "" { return url.Values(ctx.Request().Header().Headers()).Encode(), true } return ctx.Request().Header().GetHeader(strings.Replace(name, "_", "-", -1)), true }), - "http": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "http": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().Header().GetHeader(strings.Replace(name, "_", "-", -1)), true }), - "host": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "host": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Request().URI().Host(), true }), - "error": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "error": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { //TODO 暂时忽略 return "", true }), "response": Fields{ - "": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Response().String(), true }), - "body": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "body": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return string(ctx.Response().GetBody()), true }), - "header": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "header": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { if name == "" { return url.Values(ctx.Response().Headers()).Encode(), true } return ctx.Response().GetHeader(strings.Replace(name, "_", "-", -1)), true }), - "status": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { + "status": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { return ctx.Response().ProxyStatus(), true }), - "time": ReadFunc(func(name string, ctx http_service.IHttpContext) (string, bool) { - responseTime := ctx.Response().ResponseTime() - return strconv.FormatInt(responseTime.Milliseconds(), 10), true + "time": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return ctx.Response().ResponseTime(), true + }), + "length": ReadFunc(func(name string, ctx http_service.IHttpContext) (interface{}, bool) { + return strconv.Itoa(ctx.Response().ContentLength()), true }), }, "proxy": proxyFields, } proxyFields = ProxyReaders{ - "header": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "header": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { if name == "" { return url.Values(proxy.Header().Headers()).Encode(), true } return proxy.Header().GetHeader(strings.Replace(name, "_", "-", -1)), true }), - "uri": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "uri": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.URI().RequestURI(), true }), - "query": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "query": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { if name == "" { return utils.QueryUrlEncode(proxy.URI().RawQuery()), true } return url.QueryEscape(proxy.URI().GetQuery(name)), true }), - "body": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "body": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { body, err := proxy.Body().RawBody() if err != nil { return "", false } return string(body), true }), - "addr": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "addr": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.URI().Host(), true }), - "scheme": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "scheme": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.URI().Scheme(), true }), - "method": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "method": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.Method(), true }), - "status": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { - return proxy.Status(), true + "status": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.StatusCode(), true }), - "path": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "path": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.URI().Path(), true }), - "host": ProxyReadFunc(func(name string, proxy http_service.IProxy) (string, bool) { + "host": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { return proxy.Header().Host(), true }), + "request_length": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.ContentLength(), true + }), + "response_length": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.ResponseLength(), true + }), + "response_body": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.ResponseBody(), true + }), + "time": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.ResponseTime(), true + }), + "msec": ProxyReadFunc(func(name string, proxy http_service.IProxy) (interface{}, bool) { + return proxy.ProxyTime().UnixMilli(), true + }), } ) diff --git a/entries/metric-entry/metric_entry.go b/entries/metric-entry/metric_entry.go index 0aead49e..885cebff 100644 --- a/entries/metric-entry/metric_entry.go +++ b/entries/metric-entry/metric_entry.go @@ -2,12 +2,13 @@ package metric_entry import ( "fmt" + "strings" + http_entry "github.com/eolinker/apinto/entries/http-entry" "github.com/eolinker/eosc" "github.com/eolinker/eosc/eocontext" http_context "github.com/eolinker/eosc/eocontext/http-context" "github.com/eolinker/eosc/log" - "strings" ) type metricEntry struct { @@ -40,7 +41,7 @@ func (p *metricEntry) GetFloat(pattern string) (float64, bool) { func (p *metricEntry) Read(pattern string) string { //会先从rule里面读,若rule没有相应的pattern,会从ctx里面读label - value := p.iEntry.Read(pattern) + value := eosc.ReadStringFromEntry(p.iEntry, pattern) if value == "" { value = "-" } @@ -86,7 +87,7 @@ func (p *proxyMetricEntry) Read(pattern string) string { name := strings.TrimPrefix(pattern, p.prefix) f, exist := p.proxyReaders[name] if exist { - value, has := f.ReadProxy(name, p.proxy) + value, has := http_entry.ReadProxyFromProxyReader(f, p.proxy, name) if !has { value = "-" } diff --git a/entries/monitor-entry/proxy-reader.go b/entries/monitor-entry/proxy-reader.go index 4528a285..944cbc05 100644 --- a/entries/monitor-entry/proxy-reader.go +++ b/entries/monitor-entry/proxy-reader.go @@ -2,8 +2,7 @@ package monitor_entry import ( "fmt" - - "github.com/eolinker/eosc/utils" + "os" http_context "github.com/eolinker/eosc/eocontext/http-context" "github.com/eolinker/eosc/log" @@ -29,10 +28,10 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint { return make([]IPoint, 0, 1) } - globalLabels := utils.GlobalLabelGet() + //globalLabels := utils.GlobalLabelGet() labelMetrics := map[string]string{ - "cluster": globalLabels["cluster_id"], - "node": globalLabels["node_id"], + "cluster": os.Getenv("cluster_id"), + "node": os.Getenv("node_id"), } for key, label := range labels { value := ctx.GetLabel(label) diff --git a/entries/monitor-entry/request-reader.go b/entries/monitor-entry/request-reader.go index 0093f82f..0790cb5e 100644 --- a/entries/monitor-entry/request-reader.go +++ b/entries/monitor-entry/request-reader.go @@ -1,10 +1,9 @@ package monitor_entry import ( + "os" "time" - "github.com/eolinker/eosc/utils" - "github.com/eolinker/eosc/log" http_context "github.com/eolinker/eosc/eocontext/http-context" @@ -26,10 +25,9 @@ var requestFields = []string{ type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool) func ReadRequest(ctx http_context.IHttpContext) []IPoint { - globalLabels := utils.GlobalLabelGet() tags := map[string]string{ - "cluster": globalLabels["cluster_id"], - "node": globalLabels["node_id"], + "cluster": os.Getenv("cluster_id"), + "node": os.Getenv("node_id"), } for key, label := range labels { diff --git a/go.mod b/go.mod index 2d5c04e7..4119ff72 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,10 @@ go 1.19 require ( github.com/Shopify/sarama v1.32.0 github.com/brianvoe/gofakeit/v6 v6.20.1 + github.com/clbanning/mxj v1.8.4 github.com/coocood/freecache v1.2.2 github.com/dubbogo/gost v1.13.1 - github.com/eolinker/eosc v0.14.2 + github.com/eolinker/eosc v0.14.3 github.com/fasthttp/websocket v1.5.0 github.com/fullstorydev/grpcurl v1.8.7 github.com/go-redis/redis/v8 v8.11.5 diff --git a/monitor-manager/manager.go b/monitor-manager/manager.go index b51ef0cd..9e08a9b3 100644 --- a/monitor-manager/manager.go +++ b/monitor-manager/manager.go @@ -1,14 +1,15 @@ package monitor_manager import ( + "os" + "reflect" + "time" + "github.com/eolinker/apinto/entries/monitor-entry" "github.com/eolinker/apinto/scope-manager" "github.com/eolinker/eosc" "github.com/eolinker/eosc/common/bean" "github.com/eolinker/eosc/log" - "github.com/eolinker/eosc/utils" - "reflect" - "time" ) var _ IManager = (*MonitorManager)(nil) @@ -36,11 +37,10 @@ func (o *MonitorManager) RemoveCurrencyAPI(apiID string) { v, ok := o.concurrentApis.Del(apiID) if ok { now := time.Now() - globalLabel := utils.GlobalLabelGet() tags := map[string]string{ "api": apiID, - "cluster": globalLabel["cluster"], - "node": globalLabel["node"], + "cluster": os.Getenv("cluster_id"), + "node": os.Getenv("node_id"), } fields := map[string]interface{}{ "value": v.Get(), @@ -120,9 +120,8 @@ func (o *MonitorManager) proxyOutput(v scope_manager.IProxyOutput[monitor_entry. func (o *MonitorManager) genNodePoints() []monitor_entry.IPoint { now := time.Now() - globalLabel := utils.GlobalLabelGet() - cluster := globalLabel["cluster"] - node := globalLabel["node"] + cluster := os.Getenv("cluster_id") + node := os.Getenv("node_id") points := make([]monitor_entry.IPoint, 0, o.concurrentApis.Count()) for key, value := range o.concurrentApis.All() { tags := map[string]string{ diff --git a/node/fasthttp-client/client.go b/node/fasthttp-client/client.go index 4ffb7313..51ea8f2c 100644 --- a/node/fasthttp-client/client.go +++ b/node/fasthttp-client/client.go @@ -2,13 +2,14 @@ package fasthttp_client import ( "fmt" - "github.com/eolinker/eosc/eocontext" "net" "strconv" "strings" "sync" "time" + "github.com/eolinker/eosc/eocontext" + "github.com/valyala/fasthttp" ) diff --git a/node/http-context/clone.go b/node/http-context/clone.go index 0993b0d2..cf56ee32 100644 --- a/node/http-context/clone.go +++ b/node/http-context/clone.go @@ -6,6 +6,10 @@ import ( "net" "time" + http_entry "github.com/eolinker/apinto/entries/http-entry" + + "github.com/eolinker/eosc" + "github.com/valyala/fasthttp" "github.com/eolinker/eosc/utils/config" @@ -31,8 +35,15 @@ type cloneContext struct { balance eoscContext.BalanceHandler upstreamHostHandler eoscContext.UpstreamHostHandler labels map[string]string + entry eosc.IEntry + responseError error +} - responseError error +func (ctx *cloneContext) GetEntry() eosc.IEntry { + if ctx.entry == nil { + ctx.entry = http_entry.NewEntry(ctx) + } + return ctx.entry } func (ctx *cloneContext) RealIP() string { @@ -137,6 +148,7 @@ func (ctx *cloneContext) SendTo(scheme string, node eoscContext.INode, timeout t } else { agent.setStatusCode(ctx.response.Response.StatusCode()) } + agent.responseBody = string(ctx.response.Response.Body()) agent.setResponseLength(ctx.response.Response.Header.ContentLength()) diff --git a/node/http-context/context.go b/node/http-context/context.go index 9e4ebcbe..4c43f967 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -6,6 +6,10 @@ import ( "net" "time" + http_entry "github.com/eolinker/apinto/entries/http-entry" + + "github.com/eolinker/eosc" + "github.com/eolinker/apinto/entries/ctx_key" "github.com/eolinker/eosc/utils/config" @@ -36,6 +40,7 @@ type HttpContext struct { upstreamHostHandler eoscContext.UpstreamHostHandler labels map[string]string port int + entry eosc.IEntry } func (ctx *HttpContext) RealIP() string { @@ -70,6 +75,13 @@ func (ctx *HttpContext) SetBalance(handler eoscContext.BalanceHandler) { ctx.balance = handler } +func (ctx *HttpContext) GetEntry() eosc.IEntry { + if ctx.entry == nil { + ctx.entry = http_entry.NewEntry(ctx) + } + return ctx.entry +} + func (ctx *HttpContext) SetLabel(name, value string) { ctx.labels[name] = value } @@ -141,7 +153,7 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti ctx.response.ResponseHeader.refresh() agent.setStatusCode(ctx.fastHttpRequestCtx.Response.StatusCode()) } - + agent.responseBody = string(ctx.response.Response.Body()) agent.setResponseLength(ctx.fastHttpRequestCtx.Response.Header.ContentLength()) ctx.proxyRequests = append(ctx.proxyRequests, agent) diff --git a/node/http-context/proxy-agent.go b/node/http-context/proxy-agent.go index d21cb942..24beff0d 100644 --- a/node/http-context/proxy-agent.go +++ b/node/http-context/proxy-agent.go @@ -16,11 +16,16 @@ type requestAgent struct { statusCode int status string responseLength int + responseBody string beginTime time.Time endTime time.Time hostAgent *UrlAgent } +func (a *requestAgent) ResponseBody() string { + return a.responseBody +} + func (a *requestAgent) ProxyTime() time.Time { return a.beginTime } diff --git a/node/http-context/websocket-context.go b/node/http-context/websocket-context.go index 1b7417f3..2dac1806 100644 --- a/node/http-context/websocket-context.go +++ b/node/http-context/websocket-context.go @@ -3,11 +3,13 @@ package http_context import ( "errors" "fmt" - eoscContext "github.com/eolinker/eosc/eocontext" "io" "net" "sync" + eoscContext "github.com/eolinker/eosc/eocontext" + "github.com/valyala/fasthttp" + "github.com/eolinker/eosc/log" "github.com/eolinker/eosc/utils/config" @@ -24,7 +26,9 @@ type WebsocketContext struct { upstreamConn net.Conn } -var upgrader = websocket.FastHTTPUpgrader{} +var upgrader = websocket.FastHTTPUpgrader{ + CheckOrigin: func(ctx *fasthttp.RequestCtx) bool { return true }, +} func (w *WebsocketContext) Upgrade() error { err := upgrader.Upgrade(w.fastHttpRequestCtx, func(conn *websocket.Conn) {