#251 add Stop methods for registries

This commit is contained in:
smallnest
2018-08-28 14:30:49 +08:00
parent 09c949d000
commit 398d782bf2
7 changed files with 242 additions and 75 deletions

View File

@@ -40,10 +40,20 @@ type ConsulRegisterPlugin struct {
Options *store.Config
kv store.Store
dying chan struct{}
done chan struct{}
}
// Start starts to connect consul cluster
func (p *ConsulRegisterPlugin) Start() error {
if p.done == nil {
p.done = make(chan struct{})
}
if p.dying == nil {
p.dying = make(chan struct{})
}
if p.kv == nil {
kv, err := libkv.NewStore(store.CONSUL, p.ConsulServers, p.Options)
if err != nil {
@@ -69,7 +79,12 @@ func (p *ConsulRegisterPlugin) Start() error {
defer p.kv.Close()
// refresh service TTL
for range ticker.C {
for {
select {
case <-p.dying:
close(p.done)
return
case <-ticker.C:
var data []byte
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
@@ -97,7 +112,7 @@ func (p *ConsulRegisterPlugin) Start() error {
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 3})
}
}
}
}
}()
}
@@ -105,6 +120,39 @@ func (p *ConsulRegisterPlugin) Start() error {
return nil
}
// Stop unregister all services.
func (p *ConsulRegisterPlugin) Stop() error {
close(p.dying)
<-p.done
if p.kv == nil {
kv, err := libkv.NewStore(store.CONSUL, p.ConsulServers, p.Options)
if err != nil {
log.Errorf("cannot create consul registry: %v", err)
return err
}
p.kv = kv
}
if p.BasePath[0] == '/' {
p.BasePath = p.BasePath[1:]
}
for _, name := range p.Services {
nodePath := fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress)
exist, err := p.kv.Exists(nodePath)
if err != nil {
log.Errorf("cannot delete path %s: %v", nodePath, err)
continue
}
if exist {
p.kv.Delete(nodePath)
log.Infof("delete path %s", nodePath, err)
}
}
return nil
}
// HandleConnAccept handles connections from clients
func (p *ConsulRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {

View File

@@ -34,4 +34,7 @@ func TestConsulRegistry(t *testing.T) {
t.Fatal("failed to register services in consul")
}
if err := r.Stop(); err != nil {
t.Fatal(err)
}
}

View File

@@ -40,10 +40,20 @@ type EtcdRegisterPlugin struct {
Options *store.Config
kv store.Store
dying chan struct{}
done chan struct{}
}
// Start starts to connect etcd cluster
func (p *EtcdRegisterPlugin) Start() error {
if p.done == nil {
p.done = make(chan struct{})
}
if p.dying == nil {
p.dying = make(chan struct{})
}
if p.kv == nil {
kv, err := libkv.NewStore(store.ETCD, p.EtcdServers, p.Options)
if err != nil {
@@ -65,7 +75,12 @@ func (p *EtcdRegisterPlugin) Start() error {
defer p.kv.Close()
// refresh service TTL
for range ticker.C {
for {
select {
case <-p.dying:
close(p.done)
return
case <-ticker.C:
var data []byte
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
@@ -93,7 +108,7 @@ func (p *EtcdRegisterPlugin) Start() error {
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 3})
}
}
}
}
}()
}
@@ -101,6 +116,35 @@ func (p *EtcdRegisterPlugin) Start() error {
return nil
}
// Stop unregister all services.
func (p *EtcdRegisterPlugin) Stop() error {
close(p.dying)
<-p.done
if p.kv == nil {
kv, err := libkv.NewStore(store.ETCD, p.EtcdServers, p.Options)
if err != nil {
log.Errorf("cannot create etcd registry: %v", err)
return err
}
p.kv = kv
}
for _, name := range p.Services {
nodePath := fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress)
exist, err := p.kv.Exists(nodePath)
if err != nil {
log.Errorf("cannot delete path %s: %v", nodePath, err)
continue
}
if exist {
p.kv.Delete(nodePath)
log.Infof("delete path %s", nodePath, err)
}
}
return nil
}
// HandleConnAccept handles connections from clients
func (p *EtcdRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {

View File

@@ -34,4 +34,7 @@ func TestEtcdRegistry(t *testing.T) {
t.Fatal("failed to register services in etcd")
}
if err := r.Stop(); err != nil {
t.Fatal(err)
}
}

View File

@@ -32,6 +32,9 @@ type MDNSRegisterPlugin struct {
server *zeroconf.Server
domain string
dying chan struct{}
done chan struct{}
}
// NewMDNSRegisterPlugin return a new MDNSRegisterPlugin.
@@ -46,6 +49,8 @@ func NewMDNSRegisterPlugin(serviceAddress string, port int, m metrics.Registry,
Metrics: m,
UpdateInterval: updateInterval,
domain: domain,
dying: make(chan struct{}),
done: make(chan struct{}),
}
}
@@ -62,9 +67,13 @@ func (p *MDNSRegisterPlugin) Start() error {
defer p.server.Shutdown()
// refresh service TTL
for range ticker.C {
select {
case <-p.dying:
close(p.done)
return
case <-ticker.C:
if p.server == nil && len(p.Services) == 0 {
continue
break
}
var data []byte
@@ -89,6 +98,14 @@ func (p *MDNSRegisterPlugin) Start() error {
return nil
}
// Stop unregister all services.
func (p *MDNSRegisterPlugin) Stop() error {
close(p.dying)
<-p.done
p.server.Shutdown()
return nil
}
func (p *MDNSRegisterPlugin) initMDNS() {
data, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(data))

View File

@@ -41,10 +41,20 @@ type ZooKeeperRegisterPlugin struct {
Options *store.Config
kv store.Store
dying chan struct{}
done chan struct{}
}
// Start starts to connect zookeeper cluster
func (p *ZooKeeperRegisterPlugin) Start() error {
if p.done == nil {
p.done = make(chan struct{})
}
if p.dying == nil {
p.dying = make(chan struct{})
}
if p.kv == nil {
kv, err := libkv.NewStore(store.ZK, p.ZooKeeperServers, p.Options)
if err != nil {
@@ -70,7 +80,12 @@ func (p *ZooKeeperRegisterPlugin) Start() error {
defer p.kv.Close()
// refresh service TTL
for range ticker.C {
for {
select {
case <-p.dying:
close(p.done)
return
case <-ticker.C:
var data []byte
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
@@ -97,7 +112,7 @@ func (p *ZooKeeperRegisterPlugin) Start() error {
p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 3})
}
}
}
}
}()
}
@@ -105,6 +120,40 @@ func (p *ZooKeeperRegisterPlugin) Start() error {
return nil
}
// Stop unregister all services.
func (p *ZooKeeperRegisterPlugin) Stop() error {
close(p.dying)
<-p.done
if p.kv == nil {
kv, err := libkv.NewStore(store.ZK, p.ZooKeeperServers, p.Options)
if err != nil {
log.Errorf("cannot create zk registry: %v", err)
return err
}
p.kv = kv
}
if p.BasePath[0] == '/' {
p.BasePath = p.BasePath[1:]
}
for _, name := range p.Services {
nodePath := fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress)
exist, err := p.kv.Exists(nodePath)
if err != nil {
log.Errorf("cannot delete zk path %s: %v", nodePath, err)
continue
}
if exist {
p.kv.Delete(nodePath)
log.Infof("delete zk path %s", nodePath, err)
}
}
return nil
}
// HandleConnAccept handles connections from clients
func (p *ZooKeeperRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {

View File

@@ -34,4 +34,7 @@ func TestZookeeperRegistry(t *testing.T) {
t.Fatal("failed to register services in zookeeper")
}
if err := r.Stop(); err != nil {
t.Fatal(err)
}
}