diff --git a/drivers/resources/datasource/influxdbv2/client.go b/drivers/resources/datasource/influxdbv2/client.go index 13b1a1d1..d9cc9566 100644 --- a/drivers/resources/datasource/influxdbv2/client.go +++ b/drivers/resources/datasource/influxdbv2/client.go @@ -1,7 +1,6 @@ package influxdbv2 import ( - "context" "reflect" monitor_entry "github.com/eolinker/apinto/monitor-entry" @@ -15,17 +14,17 @@ import ( type Client struct { id string influxdb2.Client - api.WriteAPIBlocking + api.WriteAPI } func NewClient(cfg *Config) *Client { id := "" client := influxdb2.NewClient(cfg.Url, cfg.Token) - + writeAPI := client.WriteAPI(cfg.Org, cfg.Bucket) return &Client{ id, client, - client.WriteAPIBlocking(cfg.Org, cfg.Bucket), + writeAPI, } } @@ -34,18 +33,20 @@ func (c *Client) ID() string { } func (c *Client) Write(point monitor_entry.IPoint) error { - if c.WriteAPIBlocking != nil { + if c.WriteAPI != nil { p, ok := point.(monitor_entry.IPoint) if !ok { log.Error("need: ", reflect.TypeOf((monitor_entry.IPoint)(nil)), "now: ", reflect.TypeOf(point)) return nil } - return c.WritePoint(context.Background(), influxdb2.NewPoint( + + c.WritePoint(influxdb2.NewPoint( p.Table(), p.Tags(), p.Fields(), p.Time(), )) + return nil } return nil } diff --git a/drivers/router/http-router/http-handler.go b/drivers/router/http-router/http-handler.go index c4159a31..bc103c20 100644 --- a/drivers/router/http-router/http-handler.go +++ b/drivers/router/http-router/http-handler.go @@ -29,7 +29,7 @@ type httpHandler struct { } func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) { - ctx.SetFinish(h.finisher) + httpContext, err := http_context.Assert(ctx) if err != nil { return @@ -62,6 +62,6 @@ func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) { ctx.SetApp(h.service) ctx.SetBalance(h.service) ctx.SetUpstreamHostHandler(h.service) - + ctx.SetFinish(h.finisher) h.filters.Chain(ctx, completeCaller) } diff --git a/drivers/router/http-router/manager/manager.go b/drivers/router/http-router/manager/manager.go index 8710a6f8..ca79d47e 100644 --- a/drivers/router/http-router/manager/manager.go +++ b/drivers/router/http-router/manager/manager.go @@ -137,7 +137,10 @@ func (m *Manager) FastHandler(port int, ctx *fasthttp.RequestCtx) { log.Debug("match has:", port) r.ServeHTTP(httpContext) } - httpContext.GetFinish().Finish(httpContext) + finishHandler := httpContext.GetFinish() + if finishHandler != nil { + finishHandler.Finish(httpContext) + } } type NotFoundHandler struct { diff --git a/monitor-entry/label.go b/monitor-entry/label.go index fd839aff..2c03d0d4 100644 --- a/monitor-entry/label.go +++ b/monitor-entry/label.go @@ -1,17 +1,13 @@ package monitor_entry var ( - LabelNode = "node" - LabelCluster = "cluster" LabelApi = "api" LabelApp = "app" - LabelHandler = "handler" LabelUpstream = "upstream" ) var labels = map[string]string{ LabelApi: "api_id", LabelApp: "application_id", - LabelHandler: "handler", LabelUpstream: "service_id", } diff --git a/monitor-entry/proxy-reader.go b/monitor-entry/proxy-reader.go index e0a204ce..b46294da 100644 --- a/monitor-entry/proxy-reader.go +++ b/monitor-entry/proxy-reader.go @@ -2,7 +2,6 @@ package monitor_entry import ( "fmt" - "strconv" "github.com/eolinker/eosc/utils" @@ -12,23 +11,17 @@ import ( type ProxyReadFunc func(request http_context.IProxy) (interface{}, bool) -var proxyRequestMetrics = []string{ - "ip", - "path", -} - var proxyMetrics = []string{ "method", "host", "addr", - "path", - "status", } var proxyFields = []string{ "timing", "request", "response", + "status", } func ReadProxy(ctx http_context.IHttpContext) []IPoint { @@ -38,9 +31,8 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint { globalLabels := utils.GlobalLabelGet() labelMetrics := map[string]string{ - "request_id": ctx.RequestId(), - "cluster": globalLabels["cluster_id"], - "node": globalLabels["node_id"], + "cluster": globalLabels["cluster_id"], + "node": globalLabels["node_id"], } for key, label := range labels { value := ctx.GetLabel(label) @@ -50,24 +42,9 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint { labelMetrics[key] = value } - for _, key := range proxyRequestMetrics { - f, has := request[key] - if !has { - log.Error("proxy missing request tag function belong to ", key) - continue - } - v, has := f(ctx) - if !has { - continue - } - labelMetrics[fmt.Sprintf("request_%s", key)] = v.(string) - } - points := make([]IPoint, 0, len(ctx.Proxies())) for i, p := range ctx.Proxies() { - tags := map[string]string{ - "index": strconv.Itoa(i), - } + tags := map[string]string{} for key, value := range labelMetrics { tags[key] = value @@ -85,7 +62,9 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint { tags[metrics] = v.(string) } - fields := make(map[string]interface{}) + fields := map[string]interface{}{ + "index": i, + } for _, field := range proxyFields { f, has := proxy[field] if !has { @@ -110,14 +89,14 @@ var proxy = map[string]ProxyReadFunc{ "method": func(proxy http_context.IProxy) (interface{}, bool) { return proxy.Method(), true }, - "path": func(proxy http_context.IProxy) (interface{}, bool) { - return proxy.URI().Path(), true - }, + //"path": func(proxy http_context.IProxy) (interface{}, bool) { + // return proxy.URI().Path(), true + //}, "addr": func(proxy http_context.IProxy) (interface{}, bool) { return fmt.Sprintf("%s://%s", proxy.URI().Scheme(), proxy.URI().Host()), true }, "status": func(proxy http_context.IProxy) (interface{}, bool) { - return proxy.Status(), true + return proxy.StatusCode(), true }, "timing": func(proxy http_context.IProxy) (interface{}, bool) { return proxy.ResponseTime(), true diff --git a/monitor-entry/request-reader.go b/monitor-entry/request-reader.go index a75032db..0093f82f 100644 --- a/monitor-entry/request-reader.go +++ b/monitor-entry/request-reader.go @@ -13,9 +13,6 @@ import ( var requestMetrics = []string{ "method", "host", - "ip", - "path", - "status", } var requestFields = []string{ @@ -23,6 +20,7 @@ var requestFields = []string{ "response", "retry", "timing", + "status", } type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool) @@ -30,9 +28,8 @@ type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool) func ReadRequest(ctx http_context.IHttpContext) []IPoint { globalLabels := utils.GlobalLabelGet() tags := map[string]string{ - "request_id": ctx.RequestId(), - "cluster": globalLabels["cluster_id"], - "node": globalLabels["node_id"], + "cluster": globalLabels["cluster_id"], + "node": globalLabels["node_id"], } for key, label := range labels { @@ -78,14 +75,14 @@ var request = map[string]RequestReadFunc{ "method": func(ctx http_context.IHttpContext) (interface{}, bool) { return ctx.Request().Method(), true }, - "path": func(ctx http_context.IHttpContext) (interface{}, bool) { - return ctx.Request().URI().Path(), true - }, - "ip": func(ctx http_context.IHttpContext) (interface{}, bool) { - return ctx.GetLabel("ip"), true - }, + //"path": func(ctx http_context.IHttpContext) (interface{}, bool) { + // return ctx.Request().URI().Path(), true + //}, + //"ip": func(ctx http_context.IHttpContext) (interface{}, bool) { + // return ctx.GetLabel("ip"), true + //}, "status": func(ctx http_context.IHttpContext) (interface{}, bool) { - return ctx.Response().Status(), true + return ctx.Response().StatusCode(), true }, "timing": func(ctx http_context.IHttpContext) (interface{}, bool) { return time.Now().Sub(ctx.AcceptTime()).Milliseconds(), true