mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-05 16:57:51 +08:00
fix server self updates
This commit is contained in:
@@ -615,7 +615,7 @@ func runUpdates(node *models.Node, nodeUpdate bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := runServerPeerUpdate(node, isServer(node)); err != nil {
|
if err := runServerPeerUpdate(node, isServer(node)); err != nil {
|
||||||
logger.Log(1, "internal error when approving node:", node.ID)
|
logger.Log(1, "internal error when running peer node:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -142,9 +142,12 @@ func ServerUpdate(serverNode *models.Node, ifaceDelta bool) error {
|
|||||||
var err = serverPull(serverNode, ifaceDelta)
|
var err = serverPull(serverNode, ifaceDelta)
|
||||||
if isDeleteError(err) {
|
if isDeleteError(err) {
|
||||||
return DeleteNodeByID(serverNode, true)
|
return DeleteNodeByID(serverNode, true)
|
||||||
} else if err != nil {
|
} else if err != nil && !ifaceDelta {
|
||||||
|
err = serverPull(serverNode, true)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
actionCompleted := checkNodeActions(serverNode)
|
actionCompleted := checkNodeActions(serverNode)
|
||||||
if actionCompleted == models.NODE_DELETE {
|
if actionCompleted == models.NODE_DELETE {
|
||||||
|
@@ -5,7 +5,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
@@ -80,14 +79,16 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
|
|||||||
ncutils.Log("running pull for " + cfg.Node.Network)
|
ncutils.Log("running pull for " + cfg.Node.Network)
|
||||||
_, err := Pull(cfg.Node.Network, true)
|
_, err := Pull(cfg.Node.Network, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("[netclient] could not run pull, exiting ...", err.Error())
|
ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error())
|
||||||
|
return client
|
||||||
}
|
}
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}
|
}
|
||||||
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||||||
ncutils.Log("unable to connect to broker, retrying ...")
|
ncutils.Log("unable to connect to broker, retrying ...")
|
||||||
if time.Now().After(tperiod) {
|
if time.Now().After(tperiod) {
|
||||||
log.Fatal("[netclient] could not connect to broker, exiting ...", token.Error())
|
ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
|
||||||
|
return client
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
@@ -106,25 +107,29 @@ func MessageQueue(ctx context.Context, network string) {
|
|||||||
ncutils.Log("pulling latest config for " + cfg.Network)
|
ncutils.Log("pulling latest config for " + cfg.Network)
|
||||||
_, err := Pull(cfg.Network, true)
|
_, err := Pull(cfg.Network, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("[netclient] ", err.Error())
|
ncutils.Log(err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
ncutils.Log("daemon started for network: " + network)
|
ncutils.Log("daemon started for network: " + network)
|
||||||
client := SetupMQTT(&cfg)
|
client := SetupMQTT(&cfg)
|
||||||
if cfg.DebugOn {
|
if cfg.DebugOn {
|
||||||
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
|
||||||
log.Fatal("[netclient] ", token.Error())
|
ncutils.Log(token.Error().Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
ncutils.Log("subscribed to all topics for debugging purposes")
|
ncutils.Log("subscribed to all topics for debugging purposes")
|
||||||
}
|
}
|
||||||
if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
|
||||||
log.Fatal("[netclient] ", token.Error())
|
ncutils.Log(token.Error().Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if cfg.DebugOn {
|
if cfg.DebugOn {
|
||||||
ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
||||||
}
|
}
|
||||||
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
|
||||||
log.Fatal("[netclient] ", token.Error())
|
ncutils.Log(token.Error().Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if cfg.DebugOn {
|
if cfg.DebugOn {
|
||||||
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
||||||
@@ -136,7 +141,8 @@ func MessageQueue(ctx context.Context, network string) {
|
|||||||
}
|
}
|
||||||
if server.Address != "" {
|
if server.Address != "" {
|
||||||
if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
||||||
log.Fatal("[netclient] ", token.Error())
|
ncutils.Log(token.Error().Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if cfg.DebugOn {
|
if cfg.DebugOn {
|
||||||
ncutils.Log("subscribed to server keepalives for server " + id)
|
ncutils.Log("subscribed to server keepalives for server " + id)
|
||||||
@@ -327,7 +333,8 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|||||||
case <-time.After(time.Second * 150):
|
case <-time.After(time.Second * 150):
|
||||||
if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes
|
if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes
|
||||||
ncutils.Log("server keepalive not recieved in more than minutes, resubscribe to message queue")
|
ncutils.Log("server keepalive not recieved in more than minutes, resubscribe to message queue")
|
||||||
Resubscribe(client, cfg)
|
err := Resubscribe(client, cfg)
|
||||||
|
ncutils.Log("closing " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -353,13 +360,15 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
|
|||||||
client.Disconnect(250)
|
client.Disconnect(250)
|
||||||
client = SetupMQTT(cfg)
|
client = SetupMQTT(cfg)
|
||||||
if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, NodeUpdate); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, NodeUpdate); token.Wait() && token.Error() != nil {
|
||||||
log.Fatal("[netclient] ", token.Error())
|
ncutils.Log("error resubscribing to updates for " + cfg.Node.Network)
|
||||||
|
return token.Error()
|
||||||
}
|
}
|
||||||
if cfg.DebugOn {
|
if cfg.DebugOn {
|
||||||
ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
|
ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
|
||||||
}
|
}
|
||||||
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, UpdatePeers); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, UpdatePeers); token.Wait() && token.Error() != nil {
|
||||||
log.Fatal("[netclient] ", token.Error())
|
ncutils.Log("error resubscribing to peers for " + cfg.Node.Network)
|
||||||
|
return token.Error()
|
||||||
}
|
}
|
||||||
var id string
|
var id string
|
||||||
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
||||||
@@ -368,7 +377,8 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
|
|||||||
}
|
}
|
||||||
if server.Address != "" {
|
if server.Address != "" {
|
||||||
if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
||||||
log.Fatal("[netclient] ", token.Error())
|
ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
|
||||||
|
return token.Error()
|
||||||
}
|
}
|
||||||
if cfg.DebugOn {
|
if cfg.DebugOn {
|
||||||
ncutils.Log("subscribed to server keepalives for server " + id)
|
ncutils.Log("subscribed to server keepalives for server " + id)
|
||||||
|
Reference in New Issue
Block a user