#186 remove wathcer when xclient is closed

This commit is contained in:
smallnest
2017-12-13 12:05:25 +08:00
parent a6543956a1
commit 454b64a51a
8 changed files with 136 additions and 2 deletions

View File

@@ -4,6 +4,7 @@ package client
import (
"strings"
"sync"
"time"
"github.com/docker/libkv"
@@ -23,7 +24,7 @@ type ConsulDiscovery struct {
kv store.Store
pairs []*KVPair
chans []chan []*KVPair
mu sync.Locker
// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
RetriesAfterWatchFailed int
}
@@ -86,6 +87,21 @@ func (d *ConsulDiscovery) WatchService() chan []*KVPair {
return ch
}
func (d *ConsulDiscovery) RemoveWatcher(ch chan []*KVPair) {
d.mu.Lock()
d.mu.Unlock()
var chans []chan []*KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
func (d *ConsulDiscovery) watch() {
for {
var err error
@@ -127,6 +143,11 @@ func (d *ConsulDiscovery) watch() {
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
if r := recover(); r != nil {
}
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):

View File

@@ -4,6 +4,7 @@ package client
import (
"strings"
"sync"
"time"
"github.com/docker/libkv"
@@ -23,6 +24,7 @@ type EtcdDiscovery struct {
kv store.Store
pairs []*KVPair
chans []chan []*KVPair
mu sync.Mutex
// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
RetriesAfterWatchFailed int
@@ -81,6 +83,22 @@ func (d *EtcdDiscovery) WatchService() chan []*KVPair {
return ch
}
func (d *EtcdDiscovery) RemoveWatcher(ch chan []*KVPair) {
d.mu.Lock()
d.mu.Unlock()
var chans []chan []*KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
func (d *EtcdDiscovery) watch() {
for {
var err error
@@ -121,6 +139,12 @@ func (d *EtcdDiscovery) watch() {
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
if r := recover(); r != nil {
}
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):

View File

@@ -24,3 +24,6 @@ func (d InprocessDiscovery) GetServices() []*KVPair {
func (d InprocessDiscovery) WatchService() chan []*KVPair {
return nil
}
func (d InprocessDiscovery) RemoveWatcher(ch chan []*KVPair) {
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"net/url"
"sync"
"time"
"github.com/grandcat/zeroconf"
@@ -25,6 +26,8 @@ type MDNSDiscovery struct {
service string
pairs []*KVPair
chans []chan []*KVPair
mu sync.Mutex
}
// NewMDNSDiscovery returns a new MDNSDiscovery.
@@ -60,6 +63,22 @@ func (d *MDNSDiscovery) WatchService() chan []*KVPair {
return ch
}
func (d *MDNSDiscovery) RemoveWatcher(ch chan []*KVPair) {
d.mu.Lock()
d.mu.Unlock()
var chans []chan []*KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
func (d *MDNSDiscovery) watch() {
t := time.NewTicker(d.WatchInterval)
for range t.C {
@@ -69,6 +88,11 @@ func (d *MDNSDiscovery) watch() {
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
if r := recover(); r != nil {
}
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):

View File

@@ -1,6 +1,7 @@
package client
import (
"sync"
"time"
"github.com/smallnest/rpcx/log"
@@ -11,6 +12,7 @@ import (
type MultipleServersDiscovery struct {
pairs []*KVPair
chans []chan []*KVPair
mu sync.Mutex
}
// NewMultipleServersDiscovery returns a new MultipleServersDiscovery.
@@ -37,11 +39,32 @@ func (d *MultipleServersDiscovery) WatchService() chan []*KVPair {
return ch
}
func (d *MultipleServersDiscovery) RemoveWatcher(ch chan []*KVPair) {
d.mu.Lock()
d.mu.Unlock()
var chans []chan []*KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
// Update is used to update servers at runtime.
func (d *MultipleServersDiscovery) Update(pairs []*KVPair) {
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
if r := recover(); r != nil {
}
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):

View File

@@ -26,3 +26,5 @@ func (d Peer2PeerDiscovery) GetServices() []*KVPair {
func (d Peer2PeerDiscovery) WatchService() chan []*KVPair {
return nil
}
func (d *Peer2PeerDiscovery) RemoveWatcher(ch chan []*KVPair) {}

View File

@@ -48,6 +48,7 @@ type KVPair struct {
type ServiceDiscovery interface {
GetServices() []*KVPair
WatchService() chan []*KVPair
RemoveWatcher(ch chan []*KVPair)
Clone(servicePath string) ServiceDiscovery
}
@@ -70,6 +71,8 @@ type xClient struct {
auth string
Plugins PluginContainer
ch chan []*KVPair
}
// NewXClient creates a XClient that supports service discovery and service governance.
@@ -97,6 +100,7 @@ func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, di
ch := client.discovery.WatchService()
if ch != nil {
client.ch = ch
go client.watch(ch)
}
@@ -132,7 +136,6 @@ func (c *xClient) Auth(auth string) {
// watch changes of service and update cached clients.
func (c *xClient) watch(ch chan []*KVPair) {
for pairs := range ch {
servers := make(map[string]string)
for _, p := range pairs {
servers[p.Key] = p.Value
@@ -547,6 +550,17 @@ func (c *xClient) Close() error {
}
c.mu.Unlock()
go func() {
defer func() {
if r := recover(); r != nil {
}
}()
c.discovery.RemoveWatcher(c.ch)
close(c.ch)
}()
if len(errs) > 0 {
return ex.NewMultiError(errs)
}

View File

@@ -4,6 +4,7 @@ package client
import (
"strings"
"sync"
"time"
"github.com/docker/libkv"
@@ -23,6 +24,7 @@ type ZookeeperDiscovery struct {
kv store.Store
pairs []*KVPair
chans []chan []*KVPair
mu sync.Mutex
// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
RetriesAfterWatchFailed int
@@ -88,6 +90,22 @@ func (d *ZookeeperDiscovery) WatchService() chan []*KVPair {
return ch
}
func (d *ZookeeperDiscovery) RemoveWatcher(ch chan []*KVPair) {
d.mu.Lock()
d.mu.Unlock()
var chans []chan []*KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
func (d *ZookeeperDiscovery) watch() {
for {
var err error
@@ -130,6 +148,11 @@ func (d *ZookeeperDiscovery) watch() {
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
if r := recover(); r != nil {
}
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):