proxy表指标:修改记录timestamp时间为转发开始时间

This commit is contained in:
Liujian
2022-12-22 15:53:48 +08:00
parent c7b564ec91
commit b93d293f44
26 changed files with 209 additions and 127 deletions

View File

@@ -218,22 +218,5 @@ func ApintoProfession() []*eosc.ProfessionConfig {
},
Mod: eosc.ProfessionConfig_Worker,
},
//{
// Name: "setting",
// Label: "setting",
// Desc: "系统设置",
// Dependencies: nil,
// AppendLabels: nil,
// Drivers: []*eosc.DriverConfig{
// {
// Id: "eolinker.com:apinto:plugin",
// Name: "plugin",
// Label: "plugin",
// Desc: "插件管理器",
// Params: nil,
// },
// },
// Mod: eosc.ProfessionConfig_Singleton,
//},
}
}

View File

@@ -3,7 +3,7 @@ package fileoutput
import (
"sync"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/eosc/common/bean"

View File

@@ -3,7 +3,7 @@ package httpoutput
import (
"sync"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/eosc/common/bean"

View File

@@ -3,7 +3,7 @@ package kafka
import (
"sync"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/eosc/common/bean"

View File

@@ -3,7 +3,7 @@ package nsq
import (
"sync"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/eosc/common/bean"

View File

@@ -3,10 +3,11 @@ package syslog
import (
"sync"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/eosc/common/bean"
"github.com/eolinker/apinto/drivers"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
"github.com/eolinker/eosc"
)

View File

@@ -3,7 +3,7 @@ package access_log
import (
"fmt"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/apinto/output"

View File

@@ -3,7 +3,7 @@ package access_log
import (
"sync"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"

View File

@@ -3,8 +3,9 @@ package access_log
import (
"reflect"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/apinto/drivers"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
http_entry "github.com/eolinker/apinto/http-entry"
"github.com/eolinker/apinto/output"
"github.com/eolinker/eosc"

View File

@@ -2,8 +2,10 @@ package monitor
import (
"fmt"
"reflect"
scope_manager "github.com/eolinker/apinto/scope-manager"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
monitor_entry "github.com/eolinker/apinto/monitor-entry"
"github.com/eolinker/eosc/log"
@@ -12,14 +14,6 @@ import (
"github.com/eolinker/eosc"
)
func check(v interface{}) (*Config, error) {
cfg, ok := v.(*Config)
if !ok {
return nil, eosc.ErrorConfigType
}
return cfg, nil
}
func getList(ids []eosc.RequireId) ([]interface{}, error) {
ls := make([]interface{}, 0, len(ids))
for _, id := range ids {
@@ -30,7 +24,7 @@ func getList(ids []eosc.RequireId) ([]interface{}, error) {
_, ok := worker.(monitor_entry.IOutput)
if !ok {
return nil, fmt.Errorf("%s:worker not implement IEntryOutput", string(id))
return nil, fmt.Errorf("%s:worker d not implement IEntryOutput,now %v", string(id), reflect.TypeOf(worker))
}
ls = append(ls, worker)
@@ -52,9 +46,9 @@ func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWork
if len(list) > 0 {
proxy := scope_manager.NewProxy()
proxy.Set(list)
outputManager.Set(id, proxy)
monitorManager.SetProxyOutput(id, proxy)
} else {
outputManager.Set(id, scopeManager.Get("monitor"))
monitorManager.SetProxyOutput(id, scopeManager.Get("monitor"))
}
return o, nil

View File

@@ -3,7 +3,9 @@ package monitor
import (
"sync"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
monitor_manager "github.com/eolinker/apinto/monitor-manager"
scope_manager "github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"
@@ -17,6 +19,7 @@ const (
var (
workers eosc.IWorkers
scopeManager scope_manager.IManager
monitorManager monitor_manager.IManager
once sync.Once
)
@@ -38,6 +41,7 @@ func (f *Factory) Create(profession string, name string, label string, desc stri
once.Do(func() {
bean.Autowired(&workers)
bean.Autowired(&scopeManager)
bean.Autowired(&monitorManager)
})
return f.IExtenderDriverFactory.Create(profession, name, label, desc, params)

View File

@@ -22,13 +22,15 @@ func (l *worker) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err e
}
func (l *worker) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) {
apiID := ctx.GetLabel("api_id")
monitorManager.ConcurrencyAdd(apiID, 1)
err = next.DoChain(ctx)
if err != nil {
log.Error(err)
}
points := monitor_entry.ReadProxy(ctx)
points = append(points, monitor_entry.ReadRequest(ctx)...)
outputManager.Output(l.Id(), points)
monitorManager.Output(l.Id(), points)
return nil
}

View File

@@ -1,64 +0,0 @@
package monitor
import (
monitor_entry "github.com/eolinker/apinto/monitor-entry"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/log"
"reflect"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
)
var outputManager = NewOutputManager()
type OutputManager struct {
outputs eosc.Untyped[string, scope_manager.IProxyOutput]
pointChan chan point
}
func NewOutputManager() *OutputManager {
o := &OutputManager{
outputs: eosc.BuildUntyped[string, scope_manager.IProxyOutput](),
pointChan: make(chan point, 100),
}
go o.doLoop()
return o
}
type point struct {
id string
points []monitor_entry.IPoint
}
func (o *OutputManager) Set(id string, proxy scope_manager.IProxyOutput) {
o.outputs.Set(id, proxy)
}
func (o *OutputManager) Output(id string, ps []monitor_entry.IPoint) {
o.pointChan <- point{
id: id,
points: ps,
}
}
func (o *OutputManager) doLoop() {
for {
select {
case p, ok := <-o.pointChan:
if !ok {
return
}
v, has := o.outputs.Get(p.id)
if !has {
continue
}
for _, proxy := range v.List() {
out, ok := proxy.(monitor_entry.IOutput)
if !ok {
log.Error("error output type: ", reflect.TypeOf(proxy))
continue
}
out.Output(p.points...)
}
}
}
}

View File

@@ -4,9 +4,9 @@ import (
"reflect"
"sync"
"github.com/eolinker/eosc/common/bean"
scope_manager "github.com/eolinker/apinto/scope-manager"
scope_manager "github.com/eolinker/apinto/drivers/scope-manager"
"github.com/eolinker/eosc/common/bean"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"

View File

@@ -67,8 +67,6 @@ func (o *output) Output(metrics ...monitor_entry.IPoint) {
if o.metrics == nil {
return
}
log.Info("metrics chan size: ", cap(o.metrics))
log.Info("metrics chan length: ", len(o.metrics))
o.metrics <- metrics
}

View File

@@ -3,8 +3,6 @@ package http_router
import (
"net/http"
"github.com/eolinker/eosc/utils"
http_service "github.com/eolinker/apinto/node/http-context"
http_complete "github.com/eolinker/apinto/drivers/router/http-router/http-complete"
@@ -51,10 +49,7 @@ func (h *httpHandler) ServeHTTP(ctx eocontext.EoContext) {
}
ctx = wsCtx
}
globalLabels := utils.GlobalLabelGet()
for key, value := range globalLabels {
ctx.SetLabel(key, value)
}
//Set Label
ctx.SetLabel("api", h.routerName)
ctx.SetLabel("api_id", h.routerId)

1
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/eolinker/eosc v0.7.1
github.com/fasthttp/websocket v1.5.0
github.com/go-redis/redis/v8 v8.11.5
github.com/golang/mock v1.3.1
github.com/google/uuid v1.3.0
github.com/hashicorp/consul/api v1.9.1
github.com/influxdata/influxdb-client-go/v2 v2.12.1

View File

@@ -10,8 +10,6 @@ var (
)
var labels = map[string]string{
LabelNode: "node_id",
LabelCluster: "cluster_id",
LabelApi: "api_id",
LabelApp: "application",
LabelHandler: "handler",

View File

@@ -4,6 +4,8 @@ import (
"fmt"
"strconv"
"github.com/eolinker/eosc/utils"
http_context "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/log"
)
@@ -28,8 +30,11 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint {
if len(ctx.Proxies()) < 1 {
return make([]IPoint, 0, 1)
}
globalLabels := utils.GlobalLabelGet()
labelMetrics := map[string]string{
"request_id": ctx.RequestId(),
"cluster": globalLabels["cluster_id"],
"node": globalLabels["node_id"],
}
for key, label := range labels {
labelMetrics[key] = ctx.GetLabel(label)
@@ -46,7 +51,7 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint {
for _, metrics := range proxyMetrics {
f, has := proxy[metrics]
if !has {
log.Error("missing function belong to ", metrics)
log.Error("proxy missing tag function belong to ", metrics)
continue
}
v, has := f(p)
@@ -60,7 +65,7 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint {
for _, field := range proxyFields {
f, has := proxy[field]
if !has {
log.Error("missing function belong to ", field)
log.Error("proxy missing field function belong to ", field)
continue
}
v, has := f(p)
@@ -69,7 +74,7 @@ func ReadProxy(ctx http_context.IHttpContext) []IPoint {
}
fields[field] = v
}
points = append(points, NewPoint("proxy", tags, fields, ctx.AcceptTime()))
points = append(points, NewPoint("proxy", tags, fields, p.ProxyTime()))
}
return points
}

View File

@@ -4,6 +4,8 @@ import (
"strings"
"time"
"github.com/eolinker/eosc/utils"
"github.com/eolinker/eosc/log"
http_context "github.com/eolinker/eosc/eocontext/http-context"
@@ -28,8 +30,11 @@ var requestFields = []string{
type RequestReadFunc func(ctx http_context.IHttpContext) (interface{}, bool)
func ReadRequest(ctx http_context.IHttpContext) []IPoint {
globalLabels := utils.GlobalLabelGet()
tags := map[string]string{
"request_id": ctx.RequestId(),
"cluster": globalLabels["cluster_id"],
"node": globalLabels["node_id"],
}
for key, label := range labels {

View File

@@ -0,0 +1,15 @@
package monitor_manager
import "sync/atomic"
type concurrency struct {
count int32
}
func (c *concurrency) Add(count int32) {
atomic.AddInt32(&c.count, count)
}
func (c *concurrency) Get() int32 {
return atomic.LoadInt32(&c.count)
}

139
monitor-manager/manager.go Normal file
View File

@@ -0,0 +1,139 @@
package monitor_manager
import (
"github.com/eolinker/apinto/monitor-entry"
"github.com/eolinker/apinto/scope-manager"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/common/bean"
"github.com/eolinker/eosc/log"
"github.com/eolinker/eosc/utils"
"reflect"
"time"
)
var _ IManager = (*MonitorManager)(nil)
type IManager interface {
SetProxyOutput(id string, proxy scope_manager.IProxyOutput)
ConcurrencyAdd(apiID string, count int32)
RemoveCurrencyAPI(apiID string)
Output(id string, ps []monitor_entry.IPoint)
}
var monitorManager = NewMonitorManager()
func init() {
bean.Injection(&monitorManager)
}
type MonitorManager struct {
outputs eosc.Untyped[string, scope_manager.IProxyOutput]
concurrentApis eosc.Untyped[string, *concurrency]
pointChan chan point
}
func (o *MonitorManager) RemoveCurrencyAPI(apiID string) {
v, ok := o.concurrentApis.Del(apiID)
if ok {
now := time.Now()
globalLabel := utils.GlobalLabelGet()
tags := map[string]string{
"api": apiID,
"cluster": globalLabel["cluster"],
"node": globalLabel["node"],
}
fields := map[string]interface{}{
"value": v.Get(),
}
p := monitor_entry.NewPoint("node", tags, fields, now)
for _, v := range o.outputs.List() {
o.proxyOutput(v, []monitor_entry.IPoint{p})
}
}
}
func NewMonitorManager() IManager {
o := &MonitorManager{
outputs: eosc.BuildUntyped[string, scope_manager.IProxyOutput](),
concurrentApis: eosc.BuildUntyped[string, *concurrency](),
pointChan: make(chan point, 100),
}
go o.doLoop()
return o
}
type point struct {
id string
points []monitor_entry.IPoint
}
func (o *MonitorManager) SetProxyOutput(id string, proxy scope_manager.IProxyOutput) {
o.outputs.Set(id, proxy)
}
func (o *MonitorManager) ConcurrencyAdd(id string, count int32) {
v, has := o.concurrentApis.Get(id)
if !has {
v = &concurrency{count: 0}
o.concurrentApis.Set(id, v)
}
v.Add(count)
}
func (o *MonitorManager) Output(id string, ps []monitor_entry.IPoint) {
o.pointChan <- point{
id: id,
points: ps,
}
}
func (o *MonitorManager) doLoop() {
ticket := time.NewTicker(1 * time.Second)
defer ticket.Stop()
for {
select {
case p, ok := <-o.pointChan:
if !ok {
return
}
v, has := o.outputs.Get(p.id)
if !has {
continue
}
o.proxyOutput(v, p.points)
case <-ticket.C:
ticket.Reset(1 * time.Second)
}
}
}
func (o *MonitorManager) proxyOutput(v scope_manager.IProxyOutput, ps []monitor_entry.IPoint) {
for _, proxy := range v.List() {
out, ok := proxy.(monitor_entry.IOutput)
if !ok {
log.Error("error output type: ", reflect.TypeOf(proxy))
continue
}
out.Output(ps...)
}
}
func (o *MonitorManager) genNodePoints() []monitor_entry.IPoint {
now := time.Now()
globalLabel := utils.GlobalLabelGet()
cluster := globalLabel["cluster"]
node := globalLabel["node"]
points := make([]monitor_entry.IPoint, 0, o.concurrentApis.Count())
for key, value := range o.concurrentApis.All() {
tags := map[string]string{
"api": key,
"cluster": cluster,
"node": node,
}
fields := map[string]interface{}{
"value": value.Get(),
}
points = append(points, monitor_entry.NewPoint("node", tags, fields, now))
}
return points
}

View File

@@ -135,10 +135,10 @@ func (ctx *HttpContext) SendTo(address string, timeout time.Duration) error {
request.URI().SetHost(targetHost)
}
now := time.Now()
beginTime := time.Now()
ctx.response.responseError = fasthttp_client.ProxyTimeout(address, request, &ctx.fastHttpRequestCtx.Response, timeout)
agent := newRequestAgent(&ctx.proxyRequest, host, scheme, time.Now().Sub(now))
agent := newRequestAgent(&ctx.proxyRequest, host, scheme, beginTime, time.Now())
agent.setStatusCode(ctx.fastHttpRequestCtx.Response.StatusCode())
agent.setResponseLength(ctx.fastHttpRequestCtx.Response.Header.ContentLength())

View File

@@ -16,10 +16,15 @@ type requestAgent struct {
statusCode int
status string
responseLength int
responseTime time.Duration
beginTime time.Time
endTime time.Time
hostAgent *UrlAgent
}
func (a *requestAgent) ProxyTime() time.Time {
return a.beginTime
}
func (a *requestAgent) StatusCode() int {
return a.statusCode
}
@@ -43,12 +48,12 @@ func (a *requestAgent) setResponseLength(length int) {
}
}
func newRequestAgent(IRequest http_service.IRequest, host string, scheme string, responseTime time.Duration) *requestAgent {
return &requestAgent{IRequest: IRequest, host: host, scheme: scheme, responseTime: responseTime}
func newRequestAgent(IRequest http_service.IRequest, host string, scheme string, beginTime, endTime time.Time) *requestAgent {
return &requestAgent{IRequest: IRequest, host: host, scheme: scheme, beginTime: beginTime, endTime: endTime}
}
func (a *requestAgent) ResponseTime() int64 {
return a.responseTime.Milliseconds()
return a.endTime.Sub(a.beginTime).Milliseconds()
}
func (a *requestAgent) URI() http_service.IURIWriter {