added context to peer updates, moved nodes to memory

This commit is contained in:
0xdcarns
2023-03-06 12:21:51 -05:00
parent db8a25607c
commit 28119c22ee
5 changed files with 219 additions and 185 deletions

View File

@@ -1,6 +1,7 @@
package controller package controller
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -433,7 +434,7 @@ func getNode(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return return
} }
hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, nil) hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, nil, context.Background())
if err != nil && !database.IsEmptyRecord(err) { if err != nil && !database.IsEmptyRecord(err) {
logger.Log(0, r.Header.Get("user"), logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err)) fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
@@ -622,7 +623,7 @@ func createNode(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host, nil) hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host, nil, context.Background())
if err != nil && !database.IsEmptyRecord(err) { if err != nil && !database.IsEmptyRecord(err) {
logger.Log(0, r.Header.Get("user"), logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err)) fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err))

View File

@@ -32,17 +32,24 @@ const (
// GetNetworkNodes - gets the nodes of a network // GetNetworkNodes - gets the nodes of a network
func GetNetworkNodes(network string) ([]models.Node, error) { func GetNetworkNodes(network string) ([]models.Node, error) {
var nodes []models.Node
allnodes, err := GetAllNodes() allnodes, err := GetAllNodes()
if err != nil { if err != nil {
return []models.Node{}, err return []models.Node{}, err
} }
for _, node := range allnodes {
return GetNetworkNodesMemory(allnodes, network), nil
}
// GetNetworkNodesMemory - gets all nodes belonging to a network from list in memory
func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node {
var nodes = []models.Node{}
for i := range allNodes {
node := allNodes[i]
if node.Network == network { if node.Network == network {
nodes = append(nodes, node) nodes = append(nodes, node)
} }
} }
return nodes, nil return nodes
} }
// UpdateNode - takes a node and updates another node with it's values // UpdateNode - takes a node and updates another node with it's values

View File

@@ -1,6 +1,7 @@
package logic package logic
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"net" "net"
@@ -15,8 +16,15 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
var (
// PeerUpdateCtx context to send to host peer updates
PeerUpdateCtx context.Context
// PeerUpdateStop - the cancel for PeerUpdateCtx
PeerUpdateStop context.CancelFunc
)
// GetProxyUpdateForHost - gets the proxy update for host // GetProxyUpdateForHost - gets the proxy update for host
func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error) { func GetProxyUpdateForHost(host *models.Host, ctx context.Context) (models.ProxyManagerPayload, error) {
proxyPayload := models.ProxyManagerPayload{ proxyPayload := models.ProxyManagerPayload{
Action: models.ProxyUpdate, Action: models.ProxyUpdate,
} }
@@ -39,7 +47,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error
relayPeersMap := make(map[string]models.RelayedConf) relayPeersMap := make(map[string]models.RelayedConf)
for _, relayedHost := range relayedHosts { for _, relayedHost := range relayedHosts {
relayedHost := relayedHost relayedHost := relayedHost
payload, err := GetPeerUpdateForHost("", &relayedHost, nil) payload, err := GetPeerUpdateForHost("", &relayedHost, nil, ctx)
if err == nil { if err == nil {
relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, GetPeerListenPort(&relayedHost))) relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, GetPeerListenPort(&relayedHost)))
if udpErr == nil { if udpErr == nil {
@@ -115,11 +123,24 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error
return proxyPayload, nil return proxyPayload, nil
} }
// ResetPeerUpdateContext - kills any current peer updates and resets the context
func ResetPeerUpdateContext() {
if PeerUpdateCtx != nil && PeerUpdateStop != nil {
PeerUpdateStop() // tell any current peer updates to stop
}
PeerUpdateCtx, PeerUpdateStop = context.WithCancel(context.Background())
}
// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models.Node) (models.HostPeerUpdate, error) { func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models.Node, ctx context.Context) (models.HostPeerUpdate, error) {
if host == nil { if host == nil {
return models.HostPeerUpdate{}, errors.New("host is nil") return models.HostPeerUpdate{}, errors.New("host is nil")
} }
allNodes, err := GetAllNodes()
if err != nil {
return models.HostPeerUpdate{}, err
}
// track which nodes are deleted // track which nodes are deleted
// after peer calculation, if peer not in list, add delete config of peer // after peer calculation, if peer not in list, add delete config of peer
hostPeerUpdate := models.HostPeerUpdate{ hostPeerUpdate := models.HostPeerUpdate{
@@ -148,197 +169,200 @@ func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models
if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE { if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE {
continue continue
} }
currentPeers, err := GetNetworkNodes(node.Network) currentPeers := GetNetworkNodesMemory(allNodes, node.Network)
if err != nil {
return models.HostPeerUpdate{}, err
}
var nodePeerMap map[string]models.PeerRouteInfo var nodePeerMap map[string]models.PeerRouteInfo
if node.IsIngressGateway || node.IsEgressGateway { if node.IsIngressGateway || node.IsEgressGateway {
nodePeerMap = make(map[string]models.PeerRouteInfo) nodePeerMap = make(map[string]models.PeerRouteInfo)
} }
for _, peer := range currentPeers { for _, peer := range currentPeers {
peer := peer select {
if peer.ID.String() == node.ID.String() { case <-ctx.Done():
logger.Log(2, "peer update, skipping self") logger.Log(2, "cancelled peer update for host", host.Name, host.ID.String())
//skip yourself return models.HostPeerUpdate{}, fmt.Errorf("peer update cancelled")
continue default:
} peer := peer
var peerConfig wgtypes.PeerConfig if peer.ID.String() == node.ID.String() {
peerHost, err := GetHost(peer.HostID.String()) logger.Log(2, "peer update, skipping self")
if err != nil { //skip yourself
logger.Log(1, "no peer host", peer.HostID.String(), err.Error()) continue
return models.HostPeerUpdate{}, err
}
peerConfig.PublicKey = peerHost.PublicKey
peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
peerConfig.ReplaceAllowedIPs = true
uselocal := false
if host.EndpointIP.String() == peerHost.EndpointIP.String() {
//peer is on same network
// set to localaddress
uselocal = true
if node.LocalAddress.IP == nil {
// use public endpint
uselocal = false
} }
if node.LocalAddress.String() == peer.LocalAddress.String() { var peerConfig wgtypes.PeerConfig
uselocal = false peerHost, err := GetHost(peer.HostID.String())
if err != nil {
logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
return models.HostPeerUpdate{}, err
} }
}
peerConfig.Endpoint = &net.UDPAddr{
IP: peerHost.EndpointIP,
Port: GetPeerListenPort(peerHost),
}
if uselocal { peerConfig.PublicKey = peerHost.PublicKey
peerConfig.Endpoint.IP = peer.LocalAddress.IP peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
} peerConfig.ReplaceAllowedIPs = true
allowedips := GetAllowedIPs(&node, &peer, nil) uselocal := false
if peer.IsIngressGateway { if host.EndpointIP.String() == peerHost.EndpointIP.String() {
for _, entry := range peer.IngressGatewayRange { //peer is on same network
_, cidr, err := net.ParseCIDR(string(entry)) // set to localaddress
if err == nil { uselocal = true
allowedips = append(allowedips, *cidr) if node.LocalAddress.IP == nil {
// use public endpint
uselocal = false
}
if node.LocalAddress.String() == peer.LocalAddress.String() {
uselocal = false
} }
} }
} peerConfig.Endpoint = &net.UDPAddr{
if peer.IsEgressGateway { IP: peerHost.EndpointIP,
allowedips = append(allowedips, getEgressIPs(&node, &peer)...) Port: GetPeerListenPort(peerHost),
} }
if peer.Action != models.NODE_DELETE &&
!peer.PendingDelete &&
peer.Connected &&
nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
}
if node.IsIngressGateway || node.IsEgressGateway { if uselocal {
peerConfig.Endpoint.IP = peer.LocalAddress.IP
}
allowedips := GetAllowedIPs(&node, &peer, nil)
if peer.IsIngressGateway { if peer.IsIngressGateway {
_, extPeerIDAndAddrs, err := getExtPeers(&peer) for _, entry := range peer.IngressGatewayRange {
if err == nil { _, cidr, err := net.ParseCIDR(string(entry))
for _, extPeerIdAndAddr := range extPeerIDAndAddrs { if err == nil {
extPeerIdAndAddr := extPeerIdAndAddr allowedips = append(allowedips, *cidr)
nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
PeerAddr: net.IPNet{
IP: net.ParseIP(extPeerIdAndAddr.Address),
Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
},
PeerKey: extPeerIdAndAddr.ID,
Allow: true,
}
} }
} }
} }
if node.IsIngressGateway && peer.IsEgressGateway { if peer.IsEgressGateway {
hostPeerUpdate.IngressInfo.EgressRanges = append(hostPeerUpdate.IngressInfo.EgressRanges, allowedips = append(allowedips, getEgressIPs(&node, &peer)...)
peer.EgressGatewayRanges...)
} }
nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{ if peer.Action != models.NODE_DELETE &&
PeerAddr: net.IPNet{ !peer.PendingDelete &&
IP: net.ParseIP(peer.PrimaryAddress()), peer.Connected &&
Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()), nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
}, (deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
PeerKey: peerHost.PublicKey.String(), peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
Allow: true,
} }
}
var nodePeer wgtypes.PeerConfig if node.IsIngressGateway || node.IsEgressGateway {
if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok { if peer.IsIngressGateway {
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr) _, extPeerIDAndAddrs, err := getExtPeers(&peer)
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig) if err == nil {
peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1 for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{ extPeerIdAndAddr := extPeerIdAndAddr
ID: peer.ID.String(), nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
Address: peer.PrimaryAddress(), PeerAddr: net.IPNet{
Name: peerHost.Name, IP: net.ParseIP(extPeerIdAndAddr.Address),
Network: peer.Network, Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
} },
nodePeer = peerConfig PeerKey: extPeerIdAndAddr.ID,
} else { Allow: true,
peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs }
peerAllowedIPs = append(peerAllowedIPs, allowedips...) }
hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs }
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{ }
ID: peer.ID.String(), if node.IsIngressGateway && peer.IsEgressGateway {
Address: peer.PrimaryAddress(), hostPeerUpdate.IngressInfo.EgressRanges = append(hostPeerUpdate.IngressInfo.EgressRanges,
Name: peerHost.Name, peer.EgressGatewayRanges...)
Network: peer.Network, }
} nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{
nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
}
if node.Network == network { // add to peers map for metrics
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
ID: peer.ID.String(),
Address: peer.PrimaryAddress(),
Name: peerHost.Name,
Network: peer.Network,
}
hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
}
}
var extPeers []wgtypes.PeerConfig
var extPeerIDAndAddrs []models.IDandAddr
if node.IsIngressGateway {
extPeers, extPeerIDAndAddrs, err = getExtPeers(&node)
if err == nil {
for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
extPeerIdAndAddr := extPeerIdAndAddr
nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
PeerAddr: net.IPNet{ PeerAddr: net.IPNet{
IP: net.ParseIP(extPeerIdAndAddr.Address), IP: net.ParseIP(peer.PrimaryAddress()),
Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address), Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()),
}, },
PeerKey: extPeerIdAndAddr.ID, PeerKey: peerHost.PublicKey.String(),
Allow: true, Allow: true,
} }
} }
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, extPeers...)
for _, extPeerIdAndAddr := range extPeerIDAndAddrs { var nodePeer wgtypes.PeerConfig
extPeerIdAndAddr := extPeerIdAndAddr if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID] = make(map[string]models.IDandAddr) hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID][extPeerIdAndAddr.ID] = models.IDandAddr{ hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
ID: extPeerIdAndAddr.ID, peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
Address: extPeerIdAndAddr.Address, hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
Name: extPeerIdAndAddr.Name, ID: peer.ID.String(),
Network: node.Network, Address: peer.PrimaryAddress(),
Name: peerHost.Name,
Network: peer.Network,
} }
hostPeerUpdate.IngressInfo.ExtPeers[extPeerIdAndAddr.ID] = models.ExtClientInfo{ nodePeer = peerConfig
Masquerade: true, } else {
IngGwAddr: net.IPNet{ peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
IP: net.ParseIP(node.PrimaryAddress()), peerAllowedIPs = append(peerAllowedIPs, allowedips...)
Mask: getCIDRMaskFromAddr(node.PrimaryAddress()), hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
}, hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
Network: node.PrimaryNetworkRange(), ID: peer.ID.String(),
ExtPeerAddr: net.IPNet{ Address: peer.PrimaryAddress(),
IP: net.ParseIP(extPeerIdAndAddr.Address), Name: peerHost.Name,
Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address), Network: peer.Network,
},
ExtPeerKey: extPeerIdAndAddr.ID,
Peers: nodePeerMap,
}
if node.Network == network {
hostPeerUpdate.PeerIDs[extPeerIdAndAddr.ID] = extPeerIdAndAddr
hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, extPeers...)
} }
nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
}
if node.Network == network { // add to peers map for metrics
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
ID: peer.ID.String(),
Address: peer.PrimaryAddress(),
Name: peerHost.Name,
Network: peer.Network,
}
hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
} }
} else if !database.IsEmptyRecord(err) {
logger.Log(1, "error retrieving external clients:", err.Error())
} }
} var extPeers []wgtypes.PeerConfig
if node.IsEgressGateway { var extPeerIDAndAddrs []models.IDandAddr
hostPeerUpdate.EgressInfo[node.ID.String()] = models.EgressInfo{ if node.IsIngressGateway {
EgressID: node.ID.String(), extPeers, extPeerIDAndAddrs, err = getExtPeers(&node)
Network: node.PrimaryNetworkRange(), if err == nil {
EgressGwAddr: net.IPNet{ for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
IP: net.ParseIP(node.PrimaryAddress()), extPeerIdAndAddr := extPeerIdAndAddr
Mask: getCIDRMaskFromAddr(node.PrimaryAddress()), nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
}, PeerAddr: net.IPNet{
GwPeers: nodePeerMap, IP: net.ParseIP(extPeerIdAndAddr.Address),
EgressGWCfg: node.EgressGatewayRequest, Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
},
PeerKey: extPeerIdAndAddr.ID,
Allow: true,
}
}
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, extPeers...)
for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
extPeerIdAndAddr := extPeerIdAndAddr
hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID] = make(map[string]models.IDandAddr)
hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID][extPeerIdAndAddr.ID] = models.IDandAddr{
ID: extPeerIdAndAddr.ID,
Address: extPeerIdAndAddr.Address,
Name: extPeerIdAndAddr.Name,
Network: node.Network,
}
hostPeerUpdate.IngressInfo.ExtPeers[extPeerIdAndAddr.ID] = models.ExtClientInfo{
Masquerade: true,
IngGwAddr: net.IPNet{
IP: net.ParseIP(node.PrimaryAddress()),
Mask: getCIDRMaskFromAddr(node.PrimaryAddress()),
},
Network: node.PrimaryNetworkRange(),
ExtPeerAddr: net.IPNet{
IP: net.ParseIP(extPeerIdAndAddr.Address),
Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
},
ExtPeerKey: extPeerIdAndAddr.ID,
Peers: nodePeerMap,
}
if node.Network == network {
hostPeerUpdate.PeerIDs[extPeerIdAndAddr.ID] = extPeerIdAndAddr
hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, extPeers...)
}
}
} else if !database.IsEmptyRecord(err) {
logger.Log(1, "error retrieving external clients:", err.Error())
}
}
if node.IsEgressGateway {
hostPeerUpdate.EgressInfo[node.ID.String()] = models.EgressInfo{
EgressID: node.ID.String(),
Network: node.PrimaryNetworkRange(),
EgressGwAddr: net.IPNet{
IP: net.ParseIP(node.PrimaryAddress()),
Mask: getCIDRMaskFromAddr(node.PrimaryAddress()),
},
GwPeers: nodePeerMap,
EgressGWCfg: node.EgressGatewayRequest,
}
} }
} }
} }

View File

@@ -1,6 +1,7 @@
package mq package mq
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time" "time"
@@ -184,7 +185,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error()) logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
return return
} else { } else {
if err = PublishSingleHostPeerUpdate(currentHost, nil); err != nil { if err = PublishSingleHostPeerUpdate(currentHost, nil, context.Background()); err != nil {
logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error()) logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
return return
} }
@@ -278,7 +279,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues") logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
host, err := logic.GetHost(currentNode.HostID.String()) host, err := logic.GetHost(currentNode.HostID.String())
if err == nil { if err == nil {
if err = PublishSingleHostPeerUpdate(host, nil); err != nil { if err = PublishSingleHostPeerUpdate(host, nil, context.Background()); err != nil {
logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
} }
} }

View File

@@ -1,6 +1,7 @@
package mq package mq
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -23,10 +24,10 @@ func PublishPeerUpdate() error {
logger.Log(1, "err getting all hosts", err.Error()) logger.Log(1, "err getting all hosts", err.Error())
return err return err
} }
logic.ResetPeerUpdateContext()
for _, host := range hosts { for _, host := range hosts {
host := host host := host
err = PublishSingleHostPeerUpdate(&host, nil) if err = PublishSingleHostPeerUpdate(&host, nil, logic.PeerUpdateCtx); err != nil {
if err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
} }
} }
@@ -45,9 +46,10 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
logger.Log(1, "err getting all hosts", err.Error()) logger.Log(1, "err getting all hosts", err.Error())
return err return err
} }
logic.ResetPeerUpdateContext()
for _, host := range hosts { for _, host := range hosts {
host := host host := host
if err = PublishSingleHostPeerUpdate(&host, delNode); err != nil { if err = PublishSingleHostPeerUpdate(&host, delNode, logic.PeerUpdateCtx); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
} }
} }
@@ -55,9 +57,9 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
} }
// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
func PublishSingleHostPeerUpdate(host *models.Host, deletedNode *models.Node) error { func PublishSingleHostPeerUpdate(host *models.Host, deletedNode *models.Node, ctx context.Context) error {
peerUpdate, err := logic.GetPeerUpdateForHost("", host, deletedNode) peerUpdate, err := logic.GetPeerUpdateForHost("", host, deletedNode, ctx)
if err != nil { if err != nil {
return err return err
} }
@@ -65,7 +67,7 @@ func PublishSingleHostPeerUpdate(host *models.Host, deletedNode *models.Node) er
return nil return nil
} }
if host.ProxyEnabled { if host.ProxyEnabled {
proxyUpdate, err := logic.GetProxyUpdateForHost(host) proxyUpdate, err := logic.GetProxyUpdateForHost(host, ctx)
if err != nil { if err != nil {
return err return err
} }
@@ -422,13 +424,12 @@ func sendPeers() {
//collectServerMetrics(networks[:]) //collectServerMetrics(networks[:])
} }
if force {
for _, host := range hosts { logic.ResetPeerUpdateContext()
if force { for _, host := range hosts {
host := host host := host
logger.Log(2, "sending scheduled peer update (5 min)") logger.Log(2, "sending scheduled peer update (5 min)")
err = PublishSingleHostPeerUpdate(&host, nil) if err = PublishSingleHostPeerUpdate(&host, nil, logic.PeerUpdateCtx); err != nil {
if err != nil {
logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
} }
} }