Merge remote-tracking branch 'origin/develop' into feature_v0.10.0_serverPings

This commit is contained in:
Matthew R Kasun
2022-01-30 09:18:24 -05:00
21 changed files with 412 additions and 129 deletions

View File

@@ -8,6 +8,7 @@ import (
"os"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"
@@ -15,6 +16,7 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/netclient/auth"
"github.com/gravitl/netmaker/netclient/config"
"github.com/gravitl/netmaker/netclient/local"
"github.com/gravitl/netmaker/netclient/ncutils"
@@ -100,17 +102,17 @@ func MessageQueue(ctx context.Context, network string) {
}
ncutils.Log("subscribed to all topics for debugging purposes")
}
if token := client.Subscribe("update/"+cfg.Node.ID, 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
if cfg.DebugOn {
ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
}
if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
if cfg.DebugOn {
ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID)
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
}
var id string
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
@@ -148,20 +150,27 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
go func() {
var newNode models.Node
var cfg config.ClientConfig
err := json.Unmarshal(msg.Payload(), &newNode)
var network = parseNetworkFromTopic(msg.Topic())
cfg.Network = network
cfg.ReadConfig()
data, dataErr := decryptMsg(&cfg, msg.Payload())
if dataErr != nil {
return
}
err := json.Unmarshal([]byte(data), &newNode)
if err != nil {
ncutils.Log("error unmarshalling node update data" + err.Error())
return
}
ncutils.Log("received message to update node " + newNode.Name)
// see if cache hit, if so skip
var currentMessage = read(newNode.Network, lastNodeUpdate)
if currentMessage == string(msg.Payload()) {
if currentMessage == string(data) {
return
}
insert(newNode.Network, lastNodeUpdate, string(msg.Payload()))
cfg.Network = newNode.Network
cfg.ReadConfig()
insert(newNode.Network, lastNodeUpdate, string(data))
//check if interface name has changed if so delete.
if cfg.Node.Interface != newNode.Interface {
if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
@@ -230,21 +239,28 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
go func() {
var peerUpdate models.PeerUpdate
err := json.Unmarshal(msg.Payload(), &peerUpdate)
var network = parseNetworkFromTopic(msg.Topic())
var cfg = config.ClientConfig{}
cfg.Network = network
cfg.ReadConfig()
data, dataErr := decryptMsg(&cfg, msg.Payload())
if dataErr != nil {
return
}
err := json.Unmarshal([]byte(data), &peerUpdate)
if err != nil {
ncutils.Log("error unmarshalling peer data")
return
}
// see if cache hit, if so skip
var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
if currentMessage == string(msg.Payload()) {
if currentMessage == string(data) {
return
}
insert(peerUpdate.Network, lastPeerUpdate, string(msg.Payload()))
insert(peerUpdate.Network, lastPeerUpdate, string(data))
ncutils.Log("update peer handler")
var cfg config.ClientConfig
cfg.Network = peerUpdate.Network
cfg.ReadConfig()
var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs)
if shouldReSub {
Resubscribe(client, &cfg)
@@ -405,24 +421,64 @@ func PublishNodeUpdate(cfg *config.ClientConfig) {
if err := config.Write(cfg, cfg.Network); err != nil {
ncutils.Log("error saving configuration" + err.Error())
}
client := SetupMQTT(cfg)
data, err := json.Marshal(cfg.Node)
if err != nil {
ncutils.Log("error marshling node update " + err.Error())
}
if token := client.Publish("update/"+cfg.Node.ID, 0, false, data); token.Wait() && token.Error() != nil {
ncutils.Log("error publishing endpoint update " + token.Error().Error())
if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v \n", err))
}
client.Disconnect(250)
}
// Hello -- ping the broker to let server know node is alive and doing fine
func Hello(cfg *config.ClientConfig, network string) {
client := SetupMQTT(cfg)
if token := client.Publish("ping/"+cfg.Node.ID, 2, false, "hello world!"); token.Wait() && token.Error() != nil {
ncutils.Log("error publishing ping " + token.Error().Error())
if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte("hello world!")); err != nil {
ncutils.Log(fmt.Sprintf("error publishing ping, %v \n", err))
}
client.Disconnect(250)
}
func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
// setup the keys
trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
if err != nil {
return err
}
serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
if err != nil {
return err
}
client := SetupMQTT(cfg)
defer client.Disconnect(250)
encrypted, err := ncutils.BoxEncrypt(msg, serverPubKey, trafficPrivKey)
if err != nil {
return err
}
if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
func parseNetworkFromTopic(topic string) string {
return strings.Split(topic, "/")[1]
}
func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
// setup the keys
diskKey, keyErr := auth.RetrieveTrafficKey(cfg.Node.Network)
if keyErr != nil {
return nil, keyErr
}
serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
if err != nil {
return nil, err
}
return ncutils.BoxDecrypt(msg, serverPubKey, diskKey)
}
func shouldResub(currentServers, newServers []models.ServerAddr) bool {