fix sending keepalives; fix where keepalive timestamps stored

This commit is contained in:
Matthew R Kasun
2022-01-30 14:50:18 -05:00
parent c421c1410b
commit c0a085a53f
2 changed files with 23 additions and 8 deletions

View File

@@ -200,7 +200,7 @@ func Keepalive(ctx context.Context) {
id = servAddr.ID id = servAddr.ID
} }
} }
if id != "" { if id == "" {
logger.Log(0, "leader not defined for network", network.NetID) logger.Log(0, "leader not defined for network", network.NetID)
continue continue
} }

View File

@@ -3,6 +3,7 @@ package functions
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"log" "log"
"os" "os"
@@ -66,7 +67,7 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
opts := mqtt.NewClientOptions() opts := mqtt.NewClientOptions()
for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs { for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs {
if server.Address != "" && server.IsLeader { if server.Address != "" && server.IsLeader {
ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s \n", server.Address, cfg.Node.Network)) ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s", server.Address, cfg.Node.Network))
opts.AddBroker(server.Address + ":1883") opts.AddBroker(server.Address + ":1883")
break break
} }
@@ -106,13 +107,13 @@ func MessageQueue(ctx context.Context, network string) {
log.Fatal(token.Error()) log.Fatal(token.Error())
} }
if cfg.DebugOn { if cfg.DebugOn {
ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
} }
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
log.Fatal(token.Error()) log.Fatal(token.Error())
} }
if cfg.DebugOn { if cfg.DebugOn {
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
} }
var id string var id string
for _, server := range cfg.NetworkSettings.DefaultServerAddrs { for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
@@ -124,7 +125,7 @@ func MessageQueue(ctx context.Context, network string) {
log.Fatal(token.Error()) log.Fatal(token.Error())
} }
if cfg.DebugOn { if cfg.DebugOn {
ncutils.Log("subscribed to server keepalives") ncutils.Log("subscribed to server keepalives for server " + id)
} }
} else { } else {
ncutils.Log("leader not defined for network" + cfg.Network) ncutils.Log("leader not defined for network" + cfg.Network)
@@ -295,6 +296,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
return return
case <-time.After(time.Second * 150): case <-time.After(time.Second * 150):
if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes
ncutils.Log("server keepalive not recieved in more than minutes, resubscribe to message queue")
Resubscribe(client, cfg) Resubscribe(client, cfg)
} }
} }
@@ -303,7 +305,10 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
// ServerKeepAlive -- handler to react to keepalive messages published by server // ServerKeepAlive -- handler to react to keepalive messages published by server
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
serverid := string(msg.Payload()) serverid, err := getID(msg.Topic())
if err != nil {
ncutils.Log("invalid ID in serverkeepalive topic")
}
keepalive[serverid] = time.Now() keepalive[serverid] = time.Now()
ncutils.Log("keepalive from server") ncutils.Log("keepalive from server")
} }
@@ -412,14 +417,14 @@ func PublishNodeUpdate(cfg *config.ClientConfig) {
ncutils.Log("error marshling node update " + err.Error()) ncutils.Log("error marshling node update " + err.Error())
} }
if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil { if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v \n", err)) ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v", err))
} }
} }
// Hello -- ping the broker to let server know node is alive and doing fine // Hello -- ping the broker to let server know node is alive and doing fine
func Hello(cfg *config.ClientConfig, network string) { func Hello(cfg *config.ClientConfig, network string) {
if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte("hello world!")); err != nil { if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte("hello world!")); err != nil {
ncutils.Log(fmt.Sprintf("error publishing ping, %v \n", err)) ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
} }
} }
@@ -478,3 +483,13 @@ func shouldResub(currentServers, newServers []models.ServerAddr) bool {
} }
return false return false
} }
func getID(topic string) (string, error) {
parts := strings.Split(topic, "/")
count := len(parts)
if count == 1 {
return "", errors.New("invalid topic")
}
//the last part of the topic will be the network.ID
return parts[count-1], nil
}