diff --git a/Makefile b/Makefile index e62ec4e..8551d74 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ ineffassign: gocyclo: @ gocyclo -over 20 $(shell find . -name "*.go" |egrep -v "_testutils/*|vendor/*|pb\.go|_test\.go") -check: staticcheck gosimple ineffassign gocyclo +check: staticcheck gosimple ineffassign doc: godoc -http=:6060 @@ -43,10 +43,10 @@ fmt: go fmt ./... build: - go build -tags "reuseport kcp quic zookeeper etcd consul ping" ./... + go build ./... -buildu: +build-all: go build -tags "reuseport kcp quic zookeeper etcd consul ping" ./... test: - go test -tags "reuseport kcp quic zookeeper etcd consul ping" ./... + go test -race -tags "reuseport kcp quic zookeeper etcd consul ping" ./... diff --git a/server/server.go b/server/server.go index efc78e6..f0eaa6f 100644 --- a/server/server.go +++ b/server/server.go @@ -125,8 +125,6 @@ func (s *Server) Serve(network, address string) (err error) { // creating a new service goroutine for each. // The service goroutines read requests and then call services to reply to them. func (s *Server) serveListener(ln net.Listener) error { - s.ln = ln - if s.Plugins == nil { s.Plugins = &pluginContainer{} } @@ -134,6 +132,7 @@ func (s *Server) serveListener(ln net.Listener) error { var tempDelay time.Duration s.mu.Lock() + s.ln = ln if s.activeConn == nil { s.activeConn = make(map[net.Conn]struct{}) } diff --git a/serverplugin/consul.go b/serverplugin/consul.go index e8ec50d..5537e9f 100644 --- a/serverplugin/consul.go +++ b/serverplugin/consul.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/docker/libkv" @@ -33,6 +34,8 @@ type ConsulRegisterPlugin struct { Metrics metrics.Registry // Registered services Services []string + metasLock sync.RWMutex + metas map[string]string UpdateInterval time.Duration Options *store.Config @@ -74,7 +77,16 @@ func (p *ConsulRegisterPlugin) Start() error { nodePath := fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress) kvPaire, err := p.KV.Get(nodePath) if err != nil { - log.Infof("can't get data of node: %s, because of %v", nodePath, err.Error()) + log.Warnf("can't get data of node: %s, will re-create, because of %v", nodePath, err.Error()) + + p.metasLock.RLock() + meta := p.metas[name] + p.metasLock.RUnlock() + + err = p.KV.Put(nodePath, []byte(meta), &store.WriteOptions{TTL: p.UpdateInterval * 2}) + if err != nil { + log.Errorf("cannot re-create consul path %s: %v", nodePath, err) + } } else { v, _ := url.ParseQuery(string(kvPaire.Value)) v.Set("tps", string(data)) @@ -133,12 +145,19 @@ func (p *ConsulRegisterPlugin) Register(name string, rcvr interface{}, metadata } nodePath = fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress) - err = p.KV.Put(nodePath, []byte(p.ServiceAddress), &store.WriteOptions{TTL: p.UpdateInterval * 2}) + err = p.KV.Put(nodePath, []byte(metadata), &store.WriteOptions{TTL: p.UpdateInterval * 2}) if err != nil { log.Errorf("cannot create consul path %s: %v", nodePath, err) return err } p.Services = append(p.Services, name) + + p.metasLock.Lock() + if p.metas == nil { + p.metas = make(map[string]string) + } + p.metas[name] = metadata + p.metasLock.Unlock() return } diff --git a/serverplugin/etcd.go b/serverplugin/etcd.go index 311a2b3..a768ca1 100644 --- a/serverplugin/etcd.go +++ b/serverplugin/etcd.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/docker/libkv" @@ -33,6 +34,8 @@ type EtcdRegisterPlugin struct { Metrics metrics.Registry // Registered services Services []string + metasLock sync.RWMutex + metas map[string]string UpdateInterval time.Duration Options *store.Config @@ -71,6 +74,16 @@ func (p *EtcdRegisterPlugin) Start() error { kvPair, err := p.KV.Get(nodePath) if err != nil { log.Infof("can't get data of node: %s, because of %v", nodePath, err.Error()) + + p.metasLock.RLock() + meta := p.metas[name] + p.metasLock.RUnlock() + + err = p.KV.Put(nodePath, []byte(meta), &store.WriteOptions{TTL: p.UpdateInterval * 2}) + if err != nil { + log.Errorf("cannot re-create etcd path %s: %v", nodePath, err) + } + } else { v, _ := url.ParseQuery(string(kvPair.Value)) v.Set("tps", string(data)) @@ -126,12 +139,19 @@ func (p *EtcdRegisterPlugin) Register(name string, rcvr interface{}, metadata st } nodePath = fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress) - err = p.KV.Put(nodePath, []byte(p.ServiceAddress), &store.WriteOptions{TTL: p.UpdateInterval * 2}) + err = p.KV.Put(nodePath, []byte(metadata), &store.WriteOptions{TTL: p.UpdateInterval * 2}) if err != nil { log.Errorf("cannot create etcd path %s: %v", nodePath, err) return err } p.Services = append(p.Services, name) + + p.metasLock.Lock() + if p.metas == nil { + p.metas = make(map[string]string) + } + p.metas[name] = metadata + p.metasLock.Unlock() return } diff --git a/serverplugin/zookeeper.go b/serverplugin/zookeeper.go index 1aded30..53aa230 100644 --- a/serverplugin/zookeeper.go +++ b/serverplugin/zookeeper.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/docker/libkv" @@ -34,6 +35,8 @@ type ZooKeeperRegisterPlugin struct { Metrics metrics.Registry // Registered services Services []string + metasLock sync.RWMutex + metas map[string]string UpdateInterval time.Duration Options *store.Config @@ -76,6 +79,15 @@ func (p *ZooKeeperRegisterPlugin) Start() error { kvPaire, err := p.KV.Get(nodePath) if err != nil { log.Infof("can't get data of node: %s, because of %v", nodePath, err.Error()) + + p.metasLock.RLock() + meta := p.metas[name] + p.metasLock.RUnlock() + + err = p.KV.Put(nodePath, []byte(meta), &store.WriteOptions{TTL: p.UpdateInterval * 2}) + if err != nil { + log.Errorf("cannot re-create zookeeper path %s: %v", nodePath, err) + } } else { v, _ := url.ParseQuery(string(kvPaire.Value)) v.Set("tps", string(data)) @@ -134,12 +146,19 @@ func (p *ZooKeeperRegisterPlugin) Register(name string, rcvr interface{}, metada } nodePath = fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress) - err = p.KV.Put(nodePath, []byte(p.ServiceAddress), &store.WriteOptions{TTL: p.UpdateInterval * 2}) + err = p.KV.Put(nodePath, []byte(metadata), &store.WriteOptions{TTL: p.UpdateInterval * 2}) if err != nil { log.Errorf("cannot create zk path %s: %v", nodePath, err) return err } p.Services = append(p.Services, name) + + p.metasLock.Lock() + if p.metas == nil { + p.metas = make(map[string]string) + } + p.metas[name] = metadata + p.metasLock.Unlock() return }