Merge pull request #701 from gravitl/bugfix_v0.10.0_individual_serverkeepalive

publish individual server keepalive messages
This commit is contained in:
dcarns
2022-02-08 08:36:26 -05:00
committed by GitHub
2 changed files with 31 additions and 16 deletions

View File

@@ -188,7 +188,7 @@ func MessageQueue(ctx context.Context, network string) {
continue
}
if server.Address != "" {
if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
ncutils.Log(token.Error().Error())
return
}
@@ -440,7 +440,7 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
continue
}
if server.Address != "" {
if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
return token.Error()
}
@@ -505,7 +505,9 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig,
if cfg.Node.Endpoint != extIP && extIP != "" {
ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
cfg.Node.Endpoint = extIP
PublishNodeUpdate(cfg)
if err := PublishNodeUpdate(cfg); err != nil {
ncutils.Log("could not publish endpoint change")
}
}
intIP, err := getPrivateAddr()
if err != nil {
@@ -514,7 +516,9 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig,
if cfg.Node.LocalAddress != intIP && intIP != "" {
ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
cfg.Node.LocalAddress = intIP
PublishNodeUpdate(cfg)
if err := PublishNodeUpdate(cfg); err != nil {
ncutils.Log("could not publish local address change")
}
}
} else {
localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
@@ -524,7 +528,9 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig,
if cfg.Node.Endpoint != localIP && localIP != "" {
ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
cfg.Node.Endpoint = localIP
PublishNodeUpdate(cfg)
if err := PublishNodeUpdate(cfg); err != nil {
ncutils.Log("could not publish localip change")
}
}
}
if err := pingServer(cfg); err != nil {
@@ -537,17 +543,18 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig,
}
// PublishNodeUpdates -- saves node and pushes changes to broker
func PublishNodeUpdate(cfg *config.ClientConfig) {
func PublishNodeUpdate(cfg *config.ClientConfig) error {
if err := config.Write(cfg, cfg.Network); err != nil {
ncutils.Log("error saving configuration: " + err.Error())
return err
}
data, err := json.Marshal(cfg.Node)
if err != nil {
ncutils.Log("error marshling node update: " + err.Error())
return err
}
if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v", err))
return err
}
return nil
}
// Hello -- ping the broker to let server know node is alive and doing fine
@@ -617,14 +624,12 @@ func pingServer(cfg *config.ClientConfig) error {
node := getServerAddress(cfg)
pinger, err := ping.NewPinger(node)
if err != nil {
ncutils.Log("error creating pinger " + err.Error())
return err
}
pinger.Timeout = 2 * time.Second
pinger.Run()
stats := pinger.Statistics()
if stats.PacketLoss == 100 {
ncutils.PrintLog(fmt.Sprintf("lost packets when pinging server: packets sent:%d packets recieved: %d", stats.PacketsSent, stats.PacketsRecv), 1)
return errors.New("ping error")
}
return nil