influxdb批量完成

This commit is contained in:
Liujian
2023-01-11 15:01:01 +08:00
parent 6ec2f5de05
commit eda73a154b
6 changed files with 34 additions and 58 deletions

View File

@@ -1,7 +1,6 @@
package influxdbv2 package influxdbv2
import ( import (
"context"
"reflect" "reflect"
monitor_entry "github.com/eolinker/apinto/monitor-entry" monitor_entry "github.com/eolinker/apinto/monitor-entry"
@@ -15,17 +14,17 @@ import (
type Client struct { type Client struct {
id string id string
influxdb2.Client influxdb2.Client
api.WriteAPIBlocking api.WriteAPI
} }
func NewClient(cfg *Config) *Client { func NewClient(cfg *Config) *Client {
id := "" id := ""
client := influxdb2.NewClient(cfg.Url, cfg.Token) client := influxdb2.NewClient(cfg.Url, cfg.Token)
writeAPI := client.WriteAPI(cfg.Org, cfg.Bucket)
return &Client{ return &Client{
id, id,
client, client,
client.WriteAPIBlocking(cfg.Org, cfg.Bucket), writeAPI,
} }
} }
@@ -34,18 +33,20 @@ func (c *Client) ID() string {
} }
func (c *Client) Write(point monitor_entry.IPoint) error { func (c *Client) Write(point monitor_entry.IPoint) error {
if c.WriteAPIBlocking != nil { if c.WriteAPI != nil {
p, ok := point.(monitor_entry.IPoint) p, ok := point.(monitor_entry.IPoint)
if !ok { if !ok {
log.Error("need: ", reflect.TypeOf((monitor_entry.IPoint)(nil)), "now: ", reflect.TypeOf(point)) log.Error("need: ", reflect.TypeOf((monitor_entry.IPoint)(nil)), "now: ", reflect.TypeOf(point))
return nil return nil
} }
return c.WritePoint(context.Background(), influxdb2.NewPoint(
c.WritePoint(influxdb2.NewPoint(
p.Table(), p.Table(),
p.Tags(), p.Tags(),
p.Fields(), p.Fields(),
p.Time(), p.Time(),
)) ))
return nil
} }
return nil return nil
} }

View File

@@ -29,7 +29,7 @@ type httpHandler struct {
} }
func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) { func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) {
ctx.SetFinish(h.finisher)
httpContext, err := http_context.Assert(ctx) httpContext, err := http_context.Assert(ctx)
if err != nil { if err != nil {
return return
@@ -62,6 +62,6 @@ func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) {
ctx.SetApp(h.service) ctx.SetApp(h.service)
ctx.SetBalance(h.service) ctx.SetBalance(h.service)
ctx.SetUpstreamHostHandler(h.service) ctx.SetUpstreamHostHandler(h.service)
ctx.SetFinish(h.finisher)
h.filters.Chain(ctx, completeCaller) h.filters.Chain(ctx, completeCaller)
} }

View File

@@ -137,7 +137,10 @@ func (m *Manager) FastHandler(port int, ctx *fasthttp.RequestCtx) {
log.Debug("match has:", port) log.Debug("match has:", port)
r.ServeHTTP(httpContext) r.ServeHTTP(httpContext)
} }
httpContext.GetFinish().Finish(httpContext) finishHandler := httpContext.GetFinish()
if finishHandler != nil {
finishHandler.Finish(httpContext)
}
} }
type NotFoundHandler struct { type NotFoundHandler struct {

View File

@@ -1,17 +1,13 @@
package monitor_entry package monitor_entry
var ( var (
LabelNode = "node"
LabelCluster = "cluster"
LabelApi = "api" LabelApi = "api"
LabelApp = "app" LabelApp = "app"
LabelHandler = "handler"
LabelUpstream = "upstream" LabelUpstream = "upstream"
) )
var labels = map[string]string{ var labels = map[string]string{
LabelApi: "api_id", LabelApi: "api_id",
LabelApp: "application_id", LabelApp: "application_id",
LabelHandler: "handler",
LabelUpstream: "service_id", LabelUpstream: "service_id",
} }

View File

@@ -2,7 +2,6 @@ package monitor_entry
import ( import (
"fmt" "fmt"
"strconv"
"github.com/eolinker/eosc/utils" "github.com/eolinker/eosc/utils"
@@ -12,23 +11,17 @@ import (
type ProxyReadFunc func(request http_context.IProxy) (interface{}, bool) type ProxyReadFunc func(request http_context.IProxy) (interface{}, bool)
var proxyRequestMetrics = []string{
"ip",
"path",
}
var proxyMetrics = []string{ var proxyMetrics = []string{
"method", "method",
"host", "host",
"addr", "addr",
"path",
"status",
} }
var proxyFields = []string{ var proxyFields = []string{
"timing", "timing",
"request", "request",
"response", "response",
"status",
} }
func ReadProxy(ctx http_context.IHttpContext) []IPoint { func ReadProxy(ctx http_context.IHttpContext) []IPoint {
@@ -38,7 +31,6 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint {
globalLabels := utils.GlobalLabelGet() globalLabels := utils.GlobalLabelGet()
labelMetrics := map[string]string{ labelMetrics := map[string]string{
"request_id": ctx.RequestId(),
"cluster": globalLabels["cluster_id"], "cluster": globalLabels["cluster_id"],
"node": globalLabels["node_id"], "node": globalLabels["node_id"],
} }
@@ -50,24 +42,9 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint {
labelMetrics[key] = value labelMetrics[key] = value
} }
for _, key := range proxyRequestMetrics {
f, has := request[key]
if !has {
log.Error("proxy missing request tag function belong to ", key)
continue
}
v, has := f(ctx)
if !has {
continue
}
labelMetrics[fmt.Sprintf("request_%s", key)] = v.(string)
}
points := make([]IPoint, 0, len(ctx.Proxies())) points := make([]IPoint, 0, len(ctx.Proxies()))
for i, p := range ctx.Proxies() { for i, p := range ctx.Proxies() {
tags := map[string]string{ tags := map[string]string{}
"index": strconv.Itoa(i),
}
for key, value := range labelMetrics { for key, value := range labelMetrics {
tags[key] = value tags[key] = value
@@ -85,7 +62,9 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint {
tags[metrics] = v.(string) tags[metrics] = v.(string)
} }
fields := make(map[string]interface{}) fields := map[string]interface{}{
"index": i,
}
for _, field := range proxyFields { for _, field := range proxyFields {
f, has := proxy[field] f, has := proxy[field]
if !has { if !has {
@@ -110,14 +89,14 @@ var proxy = map[string]ProxyReadFunc{
"method": func(proxy http_context.IProxy) (interface{}, bool) { "method": func(proxy http_context.IProxy) (interface{}, bool) {
return proxy.Method(), true return proxy.Method(), true
}, },
"path": func(proxy http_context.IProxy) (interface{}, bool) { //"path": func(proxy http_context.IProxy) (interface{}, bool) {
return proxy.URI().Path(), true // return proxy.URI().Path(), true
}, //},
"addr": func(proxy http_context.IProxy) (interface{}, bool) { "addr": func(proxy http_context.IProxy) (interface{}, bool) {
return fmt.Sprintf("%s://%s", proxy.URI().Scheme(), proxy.URI().Host()), true return fmt.Sprintf("%s://%s", proxy.URI().Scheme(), proxy.URI().Host()), true
}, },
"status": func(proxy http_context.IProxy) (interface{}, bool) { "status": func(proxy http_context.IProxy) (interface{}, bool) {
return proxy.Status(), true return proxy.StatusCode(), true
}, },
"timing": func(proxy http_context.IProxy) (interface{}, bool) { "timing": func(proxy http_context.IProxy) (interface{}, bool) {
return proxy.ResponseTime(), true return proxy.ResponseTime(), true

View File

@@ -13,9 +13,6 @@ import (
var requestMetrics = []string{ var requestMetrics = []string{
"method", "method",
"host", "host",
"ip",
"path",
"status",
} }
var requestFields = []string{ var requestFields = []string{
@@ -23,6 +20,7 @@ var requestFields = []string{
"response", "response",
"retry", "retry",
"timing", "timing",
"status",
} }
type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool) type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool)
@@ -30,7 +28,6 @@ type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool)
func ReadRequest(ctx http_context.IHttpContext) []IPoint { func ReadRequest(ctx http_context.IHttpContext) []IPoint {
globalLabels := utils.GlobalLabelGet() globalLabels := utils.GlobalLabelGet()
tags := map[string]string{ tags := map[string]string{
"request_id": ctx.RequestId(),
"cluster": globalLabels["cluster_id"], "cluster": globalLabels["cluster_id"],
"node": globalLabels["node_id"], "node": globalLabels["node_id"],
} }
@@ -78,14 +75,14 @@ var request = map[string]RequestReadFunc{
"method": func(ctx http_context.IHttpContext) (interface{}, bool) { "method": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.Request().Method(), true return ctx.Request().Method(), true
}, },
"path": func(ctx http_context.IHttpContext) (interface{}, bool) { //"path": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.Request().URI().Path(), true // return ctx.Request().URI().Path(), true
}, //},
"ip": func(ctx http_context.IHttpContext) (interface{}, bool) { //"ip": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.GetLabel("ip"), true // return ctx.GetLabel("ip"), true
}, //},
"status": func(ctx http_context.IHttpContext) (interface{}, bool) { "status": func(ctx http_context.IHttpContext) (interface{}, bool) {
return ctx.Response().Status(), true return ctx.Response().StatusCode(), true
}, },
"timing": func(ctx http_context.IHttpContext) (interface{}, bool) { "timing": func(ctx http_context.IHttpContext) (interface{}, bool) {
return time.Now().Sub(ctx.AcceptTime()).Milliseconds(), true return time.Now().Sub(ctx.AcceptTime()).Milliseconds(), true