diff --git a/controllers/node.go b/controllers/node.go index 6e91b48e..be9d69e8 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -433,7 +433,7 @@ func getNode(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } - hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host) + hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, nil) if err != nil && !database.IsEmptyRecord(err) { logger.Log(0, r.Header.Get("user"), fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err)) @@ -616,7 +616,7 @@ func createNode(w http.ResponseWriter, r *http.Request) { return } } - hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host) + hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host, nil) if err != nil && !database.IsEmptyRecord(err) { logger.Log(0, r.Header.Get("user"), fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err)) @@ -985,10 +985,17 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { if !fromNode { // notify node change runUpdates(&node, false) } - go func() { // notify of peer change - if err := mq.PublishPeerUpdate(); err != nil { + go func(deletedNode *models.Node, fromNode bool) { // notify of peer change + var err error + if fromNode { + err = mq.PublishDeletedNodePeerUpdate(deletedNode) + } else { + err = mq.PublishPeerUpdate() + } + if err != nil { logger.Log(1, "error publishing peer update ", err.Error()) } + host, err := logic.GetHost(node.HostID.String()) if err != nil { logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error()) @@ -996,7 +1003,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { if err := mq.PublishDNSDelete(&node, host); err != nil { logger.Log(1, "error publishing dns update", err.Error()) } - }() + }(&node, fromNode) } func runUpdates(node *models.Node, ifaceDelta bool) { diff --git a/logic/peers.go b/logic/peers.go index 15917a8f..dbc9583c 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -41,7 +41,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error relayPeersMap := make(map[string]models.RelayedConf) for _, relayedHost := range relayedHosts { relayedHost := relayedHost - payload, err := GetPeerUpdateForHost("", &relayedHost) + payload, err := GetPeerUpdateForHost("", &relayedHost, nil) if err == nil { relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, GetPeerListenPort(&relayedHost))) if udpErr == nil { @@ -118,7 +118,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error } // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks -func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpdate, error) { +func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models.Node) (models.HostPeerUpdate, error) { if host == nil { return models.HostPeerUpdate{}, errors.New("host is nil") } @@ -139,6 +139,9 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd NodePeers: []wgtypes.PeerConfig{}, } var deletedNodes = []models.Node{} // used to track deleted nodes + if deletedNode != nil { + deletedNodes = append(deletedNodes, *deletedNode) + } logger.Log(1, "peer update for host ", host.ID.String()) peerIndexMap := make(map[string]int) for _, nodeID := range host.Nodes { diff --git a/mq/handlers.go b/mq/handlers.go index ec7ba4d8..e4b79611 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -227,11 +227,10 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues") host, err := logic.GetHost(currentNode.HostID.String()) if err == nil { - if err = PublishSingleHostPeerUpdate(host); err != nil { + if err = PublishSingleHostPeerUpdate(host, nil); err != nil { logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) } } - } logger.Log(1, "updated node metrics", id) diff --git a/mq/publishers.go b/mq/publishers.go index 63dc093f..c641e0de 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -25,7 +25,7 @@ func PublishPeerUpdate() error { } for _, host := range hosts { host := host - err = PublishSingleHostPeerUpdate(&host) + err = PublishSingleHostPeerUpdate(&host, nil) if err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } @@ -33,10 +33,31 @@ func PublishPeerUpdate() error { return err } -// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host -func PublishSingleHostPeerUpdate(host *models.Host) error { +// PublishDeletedNodePeerUpdate --- determines and publishes a peer update +// to all the hosts with a deleted node to account for +func PublishDeletedNodePeerUpdate(delNode *models.Node) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } - peerUpdate, err := logic.GetPeerUpdateForHost("", host) + hosts, err := logic.GetAllHosts() + if err != nil { + logger.Log(1, "err getting all hosts", err.Error()) + return err + } + for _, host := range hosts { + host := host + if err = PublishSingleHostPeerUpdate(&host, delNode); err != nil { + logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) + } + } + return err +} + +// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host +func PublishSingleHostPeerUpdate(host *models.Host, deletedNode *models.Node) error { + + peerUpdate, err := logic.GetPeerUpdateForHost("", host, deletedNode) if err != nil { return err } @@ -403,7 +424,7 @@ func sendPeers() { if force { host := host logger.Log(2, "sending scheduled peer update (5 min)") - err = PublishSingleHostPeerUpdate(&host) + err = PublishSingleHostPeerUpdate(&host, nil) if err != nil { logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) }