diff --git a/mq/mq.go b/mq/mq.go index bbc208da..ca0f942e 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -200,7 +200,7 @@ func Keepalive(ctx context.Context) { id = servAddr.ID } } - if id != "" { + if id == "" { logger.Log(0, "leader not defined for network", network.NetID) continue } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 154c51d6..9f208bd1 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -3,6 +3,7 @@ package functions import ( "context" "encoding/json" + "errors" "fmt" "log" "os" @@ -66,7 +67,7 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client { opts := mqtt.NewClientOptions() for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs { if server.Address != "" && server.IsLeader { - ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s \n", server.Address, cfg.Node.Network)) + ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s", server.Address, cfg.Node.Network)) opts.AddBroker(server.Address + ":1883") break } @@ -106,13 +107,13 @@ func MessageQueue(ctx context.Context, network string) { log.Fatal(token.Error()) } if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) } if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) } var id string for _, server := range cfg.NetworkSettings.DefaultServerAddrs { @@ -124,7 +125,7 @@ func MessageQueue(ctx context.Context, network string) { log.Fatal(token.Error()) } if cfg.DebugOn { - ncutils.Log("subscribed to server keepalives") + ncutils.Log("subscribed to server keepalives for server " + id) } } else { ncutils.Log("leader not defined for network" + cfg.Network) @@ -295,6 +296,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien return case <-time.After(time.Second * 150): if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes + ncutils.Log("server keepalive not recieved in more than minutes, resubscribe to message queue") Resubscribe(client, cfg) } } @@ -303,7 +305,10 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien // ServerKeepAlive -- handler to react to keepalive messages published by server func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { - serverid := string(msg.Payload()) + serverid, err := getID(msg.Topic()) + if err != nil { + ncutils.Log("invalid ID in serverkeepalive topic") + } keepalive[serverid] = time.Now() ncutils.Log("keepalive from server") } @@ -412,14 +417,14 @@ func PublishNodeUpdate(cfg *config.ClientConfig) { ncutils.Log("error marshling node update " + err.Error()) } if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil { - ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v \n", err)) + ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v", err)) } } // Hello -- ping the broker to let server know node is alive and doing fine func Hello(cfg *config.ClientConfig, network string) { if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte("hello world!")); err != nil { - ncutils.Log(fmt.Sprintf("error publishing ping, %v \n", err)) + ncutils.Log(fmt.Sprintf("error publishing ping, %v", err)) } } @@ -478,3 +483,13 @@ func shouldResub(currentServers, newServers []models.ServerAddr) bool { } return false } + +func getID(topic string) (string, error) { + parts := strings.Split(topic, "/") + count := len(parts) + if count == 1 { + return "", errors.New("invalid topic") + } + //the last part of the topic will be the network.ID + return parts[count-1], nil +}