add mdns prototype

This commit is contained in:
smallnest
2017-10-24 20:25:13 +08:00
parent 3546c279a6
commit 6af22fe71a
2 changed files with 221 additions and 0 deletions

103
client/mdns_discovery.go Normal file
View File

@@ -0,0 +1,103 @@
package client
import (
"context"
"encoding/json"
"net/url"
"time"
"github.com/grandcat/zeroconf"
"github.com/smallnest/rpcx/log"
)
type serviceMeta struct {
Service string
Meta string
ServiceAddress string
}
// MDNSDiscovery is a mdns service discovery.
// It always returns the registered servers in etcd.
type MDNSDiscovery struct {
Timeout time.Duration
WatchInterval time.Duration
service string
pairs []*KVPair
chans []chan []*KVPair
}
// NewMDNSDiscovery returns a new MDNSDiscovery.
func NewMDNSDiscovery(service string, timeout time.Duration, watchInterval time.Duration) ServiceDiscovery {
d := &MDNSDiscovery{service: service, Timeout: timeout, WatchInterval: watchInterval}
go d.watch()
return d
}
// GetServices returns the servers
func (d MDNSDiscovery) GetServices() []*KVPair {
return d.pairs
}
// WatchService returns a nil chan.
func (d MDNSDiscovery) WatchService() chan []*KVPair {
ch := make(chan []*KVPair, 10)
d.chans = append(d.chans, ch)
return ch
}
func (d MDNSDiscovery) watch() {
t := time.NewTicker(d.WatchInterval)
for range t.C {
pairs, err := browse()
if err == nil {
d.pairs = pairs
for _, ch := range d.chans {
ch := ch
go func() {
select {
case ch <- pairs:
case <-time.After(time.Minute):
log.Warn("chan is full and new change has ben dropped")
}
}()
}
}
}
}
func browse() ([]*KVPair, error) {
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
log.Warnf("Failed to initialize resolver: %v", err)
return nil, err
}
entries := make(chan *zeroconf.ServiceEntry)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
err = resolver.Browse(ctx, "_rpcxservices", "local.", entries)
if err != nil {
log.Warnf("Failed to browse: %v", err)
}
var totalServices []*KVPair
var services []*serviceMeta
for entry := range entries {
s, _ := url.QueryUnescape(entry.Text[0])
err := json.Unmarshal([]byte(s), &services)
if err != nil {
log.Warnf("Failed to browse: %v", err)
continue
}
for _, sm := range services {
totalServices = append(totalServices, &KVPair{
Key: sm.Service,
Value: sm.Meta,
})
}
}
return totalServices, nil
}

118
serverplugin/mdns.go Normal file
View File

@@ -0,0 +1,118 @@
package serverplugin
import (
"encoding/json"
"errors"
"net"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/grandcat/zeroconf"
metrics "github.com/rcrowley/go-metrics"
)
type serviceMeta struct {
Service string
Meta string
ServiceAddress string
}
// MDNSRegisterPlugin implements mdns/dns-sd registry.
type MDNSRegisterPlugin struct {
// service address, for example, tcp@127.0.0.1:8972, quic@127.0.0.1:1234
ServiceAddress string
port int
Metrics metrics.Registry
// Registered services
Services []*serviceMeta
UpdateInterval time.Duration
server *zeroconf.Server
}
// Start starts to connect etcd cluster
func (p *MDNSRegisterPlugin) Start() error {
data, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(data))
host, _ := os.Hostname()
addr := p.ServiceAddress
i := strings.Index(addr, "@")
if i > 0 {
addr = addr[i+1:]
}
_, portStr, err := net.SplitHostPort(addr)
if err != nil {
panic(err)
}
p.port, err = strconv.Atoi(portStr)
if err != nil {
panic(err)
}
server, err := zeroconf.Register(host, "_rpcxservices", "local.", p.port, []string{s}, nil)
if err != nil {
panic(err)
}
p.server = server
if p.UpdateInterval > 0 {
ticker := time.NewTicker(p.UpdateInterval)
go func() {
p.server.Shutdown()
// refresh service TTL
for range ticker.C {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
data := []byte(strconv.FormatInt(clientMeter.Count()/60, 10))
//set this same metrics for all services at this server
for _, sm := range p.Services {
v, _ := url.ParseQuery(string(sm.Meta))
v.Set("tps", string(data))
sm.Meta = v.Encode()
}
ss, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(ss))
p.server.SetText([]string{s})
}
}()
}
return nil
}
// HandleConnAccept handles connections from clients
func (p *MDNSRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
clientMeter.Mark(1)
}
return conn, true
}
// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *MDNSRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
if "" == strings.TrimSpace(name) {
err = errors.New("Register service `name` can't be empty")
return
}
if p.server == nil {
return errors.New("MDNSRegisterPlugin has not started")
}
sm := &serviceMeta{
Service: name,
Meta: metadata,
ServiceAddress: p.ServiceAddress,
}
p.Services = append(p.Services, sm)
ss, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(ss))
p.server.SetText([]string{s})
return
}