mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-04 16:33:49 +08:00
added ack and done signals
This commit is contained in:
@@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/gravitl/netmaker/database"
|
"github.com/gravitl/netmaker/database"
|
||||||
@@ -642,14 +641,8 @@ func runUpdates(node *models.Node, ifaceDelta bool) {
|
|||||||
|
|
||||||
// updates local peers for a server on a given node's network
|
// updates local peers for a server on a given node's network
|
||||||
func runServerUpdate(node *models.Node, ifaceDelta bool) error {
|
func runServerUpdate(node *models.Node, ifaceDelta bool) error {
|
||||||
var mutex sync.Mutex
|
|
||||||
mutex.Lock()
|
|
||||||
defer mutex.Unlock()
|
|
||||||
if servercfg.IsClientMode() != "on" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !isServer(node) {
|
if servercfg.IsClientMode() != "on" || !isServer(node) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/gravitl/netmaker/logger"
|
"github.com/gravitl/netmaker/logger"
|
||||||
"github.com/gravitl/netmaker/logic"
|
"github.com/gravitl/netmaker/logic"
|
||||||
"github.com/gravitl/netmaker/models"
|
"github.com/gravitl/netmaker/models"
|
||||||
|
"github.com/gravitl/netmaker/netclient/ncutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultHandler default message queue handler - only called when GetDebug == true
|
// DefaultHandler default message queue handler - only called when GetDebug == true
|
||||||
@@ -96,10 +97,30 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
|
|||||||
logger.Log(1, "error getting node ", id, err.Error())
|
logger.Log(1, "error getting node ", id, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := PublishPeerUpdate(¤tNode); err != nil {
|
decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
||||||
logger.Log(1, "error publishing peer update ", err.Error())
|
if decryptErr != nil {
|
||||||
|
logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
switch decrypted[0] {
|
||||||
|
case ncutils.ACK:
|
||||||
|
currentServerNode, err := logic.GetNetworkServerLocal(currentNode.Network)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := logic.ServerUpdate(¤tServerNode, false); err != nil {
|
||||||
|
logger.Log(1, "server node:", currentServerNode.ID, "failed update")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case ncutils.DONE:
|
||||||
|
if err := PublishPeerUpdate(¤tNode); err != nil {
|
||||||
|
logger.Log(1, "error publishing peer update ", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case ncutils.KEY:
|
||||||
|
logger.Log(0, "I should have broke")
|
||||||
|
}
|
||||||
|
|
||||||
logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name)
|
logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@@ -199,11 +199,11 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
|
if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
|
||||||
pubErr := publishClientPeers(&cfg)
|
ackErr := publishSignal(&cfg, ncutils.ACK)
|
||||||
if pubErr != nil {
|
if ackErr != nil {
|
||||||
ncutils.Log("could not notify server to update peers after interface change")
|
ncutils.Log("could not notify server that it received an interface update")
|
||||||
} else {
|
} else {
|
||||||
ncutils.Log("signalled peer update to server")
|
ncutils.Log("signalled acknowledgement of change to server")
|
||||||
}
|
}
|
||||||
ncutils.Log("applying WG conf to " + file)
|
ncutils.Log("applying WG conf to " + file)
|
||||||
err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
|
err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
|
||||||
@@ -221,6 +221,12 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
doneErr := publishSignal(&cfg, ncutils.DONE)
|
||||||
|
if doneErr != nil {
|
||||||
|
ncutils.Log("could not notify server to update peers after interface change")
|
||||||
|
} else {
|
||||||
|
ncutils.Log("signalled finshed interface update to server")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//deal with DNS
|
//deal with DNS
|
||||||
if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
|
if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
|
||||||
@@ -480,9 +486,8 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// publishes a message to server to update peers on this peer's behalf
|
// publishes a message to server to update peers on this peer's behalf
|
||||||
func publishClientPeers(cfg *config.ClientConfig) error {
|
func publishSignal(cfg *config.ClientConfig, signal byte) error {
|
||||||
payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different
|
if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), []byte{signal}, 1); err != nil {
|
||||||
if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), payload, 1); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
10
netclient/ncutils/constants.go
Normal file
10
netclient/ncutils/constants.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package ncutils
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ACK - acknowledgement signal for MQ
|
||||||
|
ACK = 1
|
||||||
|
// DONE - done signal for MQ
|
||||||
|
DONE = 2
|
||||||
|
// KEY - key update completed signal for MQ
|
||||||
|
KEY = 3
|
||||||
|
)
|
Reference in New Issue
Block a user