diff --git a/mq/mq.go b/mq/mq.go index b483558c..87c7ce8e 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -243,11 +243,7 @@ func Keepalive(ctx context.Context) { logger.Log(1, "leader not defined for network ", network.NetID) continue } - if token := client.Publish("serverkeepalive/"+network.NetID, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { - logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error()) - } else { - logger.Log(2, "keepalive sent for network", network.NetID) - } + publishServerKeepalive(client, &network) err = serverctl.SyncServerNetwork(network.NetID) if err != nil { logger.Log(1, "error syncing server network", err.Error()) @@ -257,3 +253,17 @@ func Keepalive(ctx context.Context) { } } } + +func publishServerKeepalive(client mqtt.Client, network *models.Network) { + nodes, err := logic.GetNetworkNodes(network.NetID) + if err != nil { + return + } + for _, node := range nodes { + if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { + logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error()) + } else { + logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID) + } + } +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 30d3c52d..5be68c7d 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -188,7 +188,7 @@ func MessageQueue(ctx context.Context, network string) { continue } if server.Address != "" { - if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { + if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { ncutils.Log(token.Error().Error()) return } @@ -440,7 +440,7 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error { continue } if server.Address != "" { - if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { + if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network) return token.Error() } @@ -505,7 +505,9 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig, if cfg.Node.Endpoint != extIP && extIP != "" { ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1) cfg.Node.Endpoint = extIP - PublishNodeUpdate(cfg) + if err := PublishNodeUpdate(cfg); err != nil { + ncutils.Log("could not publish endpoint change") + } } intIP, err := getPrivateAddr() if err != nil { @@ -514,7 +516,9 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig, if cfg.Node.LocalAddress != intIP && intIP != "" { ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1) cfg.Node.LocalAddress = intIP - PublishNodeUpdate(cfg) + if err := PublishNodeUpdate(cfg); err != nil { + ncutils.Log("could not publish local address change") + } } } else { localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange) @@ -524,7 +528,9 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig, if cfg.Node.Endpoint != localIP && localIP != "" { ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1) cfg.Node.Endpoint = localIP - PublishNodeUpdate(cfg) + if err := PublishNodeUpdate(cfg); err != nil { + ncutils.Log("could not publish localip change") + } } } if err := pingServer(cfg); err != nil { @@ -537,17 +543,18 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig, } // PublishNodeUpdates -- saves node and pushes changes to broker -func PublishNodeUpdate(cfg *config.ClientConfig) { +func PublishNodeUpdate(cfg *config.ClientConfig) error { if err := config.Write(cfg, cfg.Network); err != nil { - ncutils.Log("error saving configuration: " + err.Error()) + return err } data, err := json.Marshal(cfg.Node) if err != nil { - ncutils.Log("error marshling node update: " + err.Error()) + return err } if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil { - ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v", err)) + return err } + return nil } // Hello -- ping the broker to let server know node is alive and doing fine @@ -617,14 +624,12 @@ func pingServer(cfg *config.ClientConfig) error { node := getServerAddress(cfg) pinger, err := ping.NewPinger(node) if err != nil { - ncutils.Log("error creating pinger " + err.Error()) return err } pinger.Timeout = 2 * time.Second pinger.Run() stats := pinger.Statistics() if stats.PacketLoss == 100 { - ncutils.PrintLog(fmt.Sprintf("lost packets when pinging server: packets sent:%d packets recieved: %d", stats.PacketsSent, stats.PacketsRecv), 1) return errors.New("ping error") } return nil