mirror of
				https://github.com/smallnest/rpcx.git
				synced 2025-11-01 11:52:42 +08:00 
			
		
		
		
	influxdb has changes its api
This commit is contained in:
		
							
								
								
									
										198
									
								
								serverplugin/influxdb.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										198
									
								
								serverplugin/influxdb.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,198 @@ | ||||
| package serverplugin | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	uurl "net/url" | ||||
| 	"time" | ||||
|  | ||||
| 	client "github.com/influxdata/influxdb1-client" | ||||
| 	"github.com/rcrowley/go-metrics" | ||||
| ) | ||||
|  | ||||
| type reporter struct { | ||||
| 	reg      metrics.Registry | ||||
| 	interval time.Duration | ||||
|  | ||||
| 	url      uurl.URL | ||||
| 	database string | ||||
| 	username string | ||||
| 	password string | ||||
| 	tags     map[string]string | ||||
|  | ||||
| 	client *client.Client | ||||
| } | ||||
|  | ||||
| // InfluxDB starts a InfluxDB reporter which will post the metrics from the given registry at each d interval. | ||||
| func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password string) { | ||||
| 	InfluxDBWithTags(r, d, url, database, username, password, nil) | ||||
| } | ||||
|  | ||||
| // InfluxDBWithTags starts a InfluxDB reporter which will post the metrics from the given registry at each d interval with the specified tags | ||||
| func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password string, tags map[string]string) { | ||||
| 	u, err := uurl.Parse(url) | ||||
| 	if err != nil { | ||||
| 		log.Printf("unable to parse InfluxDB url %s. err=%v", url, err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	rep := &reporter{ | ||||
| 		reg:      r, | ||||
| 		interval: d, | ||||
| 		url:      *u, | ||||
| 		database: database, | ||||
| 		username: username, | ||||
| 		password: password, | ||||
| 		tags:     tags, | ||||
| 	} | ||||
| 	if err := rep.makeClient(); err != nil { | ||||
| 		log.Printf("unable to make InfluxDB client. err=%v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	rep.run() | ||||
| } | ||||
|  | ||||
| func (r *reporter) makeClient() (err error) { | ||||
| 	r.client, err = client.NewClient(client.Config{ | ||||
| 		URL:      r.url, | ||||
| 		Username: r.username, | ||||
| 		Password: r.password, | ||||
| 	}) | ||||
|  | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (r *reporter) run() { | ||||
| 	intervalTicker := time.Tick(r.interval) | ||||
| 	pingTicker := time.Tick(time.Second * 5) | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-intervalTicker: | ||||
| 			if err := r.send(); err != nil { | ||||
| 				log.Printf("unable to send metrics to InfluxDB. err=%v", err) | ||||
| 			} | ||||
| 		case <-pingTicker: | ||||
| 			_, _, err := r.client.Ping() | ||||
| 			if err != nil { | ||||
| 				log.Printf("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err) | ||||
|  | ||||
| 				if err = r.makeClient(); err != nil { | ||||
| 					log.Printf("unable to make InfluxDB client. err=%v", err) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (r *reporter) send() error { | ||||
| 	var pts []client.Point | ||||
|  | ||||
| 	r.reg.Each(func(name string, i interface{}) { | ||||
| 		now := time.Now() | ||||
|  | ||||
| 		switch metric := i.(type) { | ||||
| 		case metrics.Counter: | ||||
| 			ms := metric.Snapshot() | ||||
| 			pts = append(pts, client.Point{ | ||||
| 				Measurement: fmt.Sprintf("%s.count", name), | ||||
| 				Tags:        r.tags, | ||||
| 				Fields: map[string]interface{}{ | ||||
| 					"value": ms.Count(), | ||||
| 				}, | ||||
| 				Time: now, | ||||
| 			}) | ||||
| 		case metrics.Gauge: | ||||
| 			ms := metric.Snapshot() | ||||
| 			pts = append(pts, client.Point{ | ||||
| 				Measurement: fmt.Sprintf("%s.gauge", name), | ||||
| 				Tags:        r.tags, | ||||
| 				Fields: map[string]interface{}{ | ||||
| 					"value": ms.Value(), | ||||
| 				}, | ||||
| 				Time: now, | ||||
| 			}) | ||||
| 		case metrics.GaugeFloat64: | ||||
| 			ms := metric.Snapshot() | ||||
| 			pts = append(pts, client.Point{ | ||||
| 				Measurement: fmt.Sprintf("%s.gauge", name), | ||||
| 				Tags:        r.tags, | ||||
| 				Fields: map[string]interface{}{ | ||||
| 					"value": ms.Value(), | ||||
| 				}, | ||||
| 				Time: now, | ||||
| 			}) | ||||
| 		case metrics.Histogram: | ||||
| 			ms := metric.Snapshot() | ||||
| 			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) | ||||
| 			pts = append(pts, client.Point{ | ||||
| 				Measurement: fmt.Sprintf("%s.histogram", name), | ||||
| 				Tags:        r.tags, | ||||
| 				Fields: map[string]interface{}{ | ||||
| 					"count":    ms.Count(), | ||||
| 					"max":      ms.Max(), | ||||
| 					"mean":     ms.Mean(), | ||||
| 					"min":      ms.Min(), | ||||
| 					"stddev":   ms.StdDev(), | ||||
| 					"variance": ms.Variance(), | ||||
| 					"p50":      ps[0], | ||||
| 					"p75":      ps[1], | ||||
| 					"p95":      ps[2], | ||||
| 					"p99":      ps[3], | ||||
| 					"p999":     ps[4], | ||||
| 					"p9999":    ps[5], | ||||
| 				}, | ||||
| 				Time: now, | ||||
| 			}) | ||||
| 		case metrics.Meter: | ||||
| 			ms := metric.Snapshot() | ||||
| 			pts = append(pts, client.Point{ | ||||
| 				Measurement: fmt.Sprintf("%s.meter", name), | ||||
| 				Tags:        r.tags, | ||||
| 				Fields: map[string]interface{}{ | ||||
| 					"count": ms.Count(), | ||||
| 					"m1":    ms.Rate1(), | ||||
| 					"m5":    ms.Rate5(), | ||||
| 					"m15":   ms.Rate15(), | ||||
| 					"mean":  ms.RateMean(), | ||||
| 				}, | ||||
| 				Time: now, | ||||
| 			}) | ||||
| 		case metrics.Timer: | ||||
| 			ms := metric.Snapshot() | ||||
| 			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) | ||||
| 			pts = append(pts, client.Point{ | ||||
| 				Measurement: fmt.Sprintf("%s.timer", name), | ||||
| 				Tags:        r.tags, | ||||
| 				Fields: map[string]interface{}{ | ||||
| 					"count":    ms.Count(), | ||||
| 					"max":      ms.Max(), | ||||
| 					"mean":     ms.Mean(), | ||||
| 					"min":      ms.Min(), | ||||
| 					"stddev":   ms.StdDev(), | ||||
| 					"variance": ms.Variance(), | ||||
| 					"p50":      ps[0], | ||||
| 					"p75":      ps[1], | ||||
| 					"p95":      ps[2], | ||||
| 					"p99":      ps[3], | ||||
| 					"p999":     ps[4], | ||||
| 					"p9999":    ps[5], | ||||
| 					"m1":       ms.Rate1(), | ||||
| 					"m5":       ms.Rate5(), | ||||
| 					"m15":      ms.Rate15(), | ||||
| 					"meanrate": ms.RateMean(), | ||||
| 				}, | ||||
| 				Time: now, | ||||
| 			}) | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	bps := client.BatchPoints{ | ||||
| 		Points:   pts, | ||||
| 		Database: r.database, | ||||
| 	} | ||||
|  | ||||
| 	_, err := r.client.Write(bps) | ||||
| 	return err | ||||
| } | ||||
| @@ -9,7 +9,6 @@ import ( | ||||
| 	"github.com/rcrowley/go-metrics/exp" | ||||
| 	"github.com/smallnest/rpcx/protocol" | ||||
| 	"github.com/smallnest/rpcx/server" | ||||
| 	influxdb "github.com/vrischmann/go-metrics-influxdb" | ||||
| ) | ||||
|  | ||||
| // MetricsPlugin has an issue. It changes seq of requests and it is wrong!!!! | ||||
| @@ -111,7 +110,7 @@ func (p *MetricsPlugin) Graphite(freq time.Duration, prefix string, addr *net.TC | ||||
| // 	p.InfluxDB(10e9, "http://127.0.0.1:8086","metrics", "test","test"}) | ||||
| // | ||||
| func (p *MetricsPlugin) InfluxDB(freq time.Duration, url, database, username, password string) { | ||||
| 	go influxdb.InfluxDB(p.Registry, freq, url, database, username, password) | ||||
| 	go InfluxDB(p.Registry, freq, url, database, username, password) | ||||
| } | ||||
|  | ||||
| // InfluxDBWithTags reports metrics into influxdb with tags. | ||||
| @@ -120,7 +119,7 @@ func (p *MetricsPlugin) InfluxDB(freq time.Duration, url, database, username, pa | ||||
| // 	p.InfluxDBWithTags(10e9, "http://127.0.0.1:8086","metrics", "test","test", map[string]string{"host":"127.0.0.1"}) | ||||
| // | ||||
| func (p *MetricsPlugin) InfluxDBWithTags(freq time.Duration, url, database, username, password string, tags map[string]string) { | ||||
| 	go influxdb.InfluxDBWithTags(p.Registry, freq, url, database, username, password, tags) | ||||
| 	go InfluxDBWithTags(p.Registry, freq, url, database, username, password, tags) | ||||
| } | ||||
|  | ||||
| // Exp uses the same mechanism as the official expvar but exposed under /debug/metrics, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 smallnest
					smallnest