NET-725: Failovers (#2685)

* api to  to get host relayed from client

* add auto relay to api host

* add peer nat type

* set pro field on signal

* rm net check on relay me handler

* return success response

* re-establish failover logic

* set failOver ctx

* failOver with peer pub key

* failovered peer updates

* failover handlers, reset failovered peer on deletion

* rm unused funcs

* initialize failover handler on EE

* ignore failover node on signal

* failover changes

* set host id on signal

* extend signal model to include node ids

* add backwards compatibility

* add failover as node api

* set json response on failover handers

* add failover field to api node

* fix signal data check

* initialize failover peer map

* reset failovered status when relayed or deleted

* add failover info to api node

* reset network failover

* only proceed furtuer if failover exists in the network

* set failOver node defaults

* cannot set failover node as relayed

* debug log

* debug log

* debug changes

* debug changes

* debug changes

* revert debug changes

* don't add peers to idmap when removed

* reset failed Over

* fix static checks

* rm debug log

* add check for linux host
This commit is contained in:
Abhishek K
2023-11-29 20:10:07 +04:00
committed by GitHub
parent 5efa52279d
commit b78cc0a8a1
24 changed files with 434 additions and 262 deletions

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/google/uuid"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
@@ -99,6 +100,16 @@ func pull(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return return
} }
for _, nodeID := range host.Nodes {
node, err := logic.GetNodeByID(nodeID)
if err != nil {
slog.Error("failed to get node:", "id", node.ID, "error", err)
continue
}
if node.FailedOverBy != uuid.Nil {
go logic.ResetFailedOverPeer(&node)
}
}
allNodes, err := logic.GetAllNodes() allNodes, err := logic.GetAllNodes()
if err != nil { if err != nil {
logger.Log(0, "failed to get nodes: ", hostID) logger.Log(0, "failed to get nodes: ", hostID)
@@ -533,39 +544,33 @@ func signalPeer(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return return
} }
if signal.ToHostPubKey == "" || signal.TurnRelayEndpoint == "" { if signal.ToHostPubKey == "" || (!servercfg.IsPro && signal.TurnRelayEndpoint == "") {
msg := "insufficient data to signal peer" msg := "insufficient data to signal peer"
logger.Log(0, r.Header.Get("user"), msg) logger.Log(0, r.Header.Get("user"), msg)
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New(msg), "badrequest")) logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New(msg), "badrequest"))
return return
} }
hosts, err := logic.GetAllHosts() signal.IsPro = servercfg.IsPro
var peerHost *models.Host
if signal.ToHostID == "" {
peerHost, err = logic.GetHostByPubKey(signal.ToHostPubKey)
} else {
peerHost, err = logic.GetHost(signal.ToHostID)
}
if err != nil { if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failed to signal, peer not found"), "badrequest"))
return return
} }
// push the signal to host through mq
found := false
for _, hostI := range hosts {
if hostI.PublicKey.String() == signal.ToHostPubKey {
// found host publish message and break
found = true
err = mq.HostUpdate(&models.HostUpdate{ err = mq.HostUpdate(&models.HostUpdate{
Action: models.SignalHost, Action: models.SignalHost,
Host: hostI, Host: *peerHost,
Signal: signal, Signal: signal,
}) })
if err != nil { if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failed to publish signal to peer: "+err.Error()), "badrequest")) logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failed to publish signal to peer: "+err.Error()), "badrequest"))
return return
} }
break
}
}
if !found {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failed to signal, peer not found"), "badrequest"))
return
}
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(signal) json.NewEncoder(w).Encode(signal)
} }

View File

@@ -218,7 +218,5 @@ func convertLegacyNode(legacy models.LegacyNode, hostID uuid.UUID) models.Node {
node.IngressGatewayRange6 = legacy.IngressGatewayRange6 node.IngressGatewayRange6 = legacy.IngressGatewayRange6
node.DefaultACL = legacy.DefaultACL node.DefaultACL = legacy.DefaultACL
node.OwnerID = legacy.OwnerID node.OwnerID = legacy.OwnerID
node.FailoverNode, _ = uuid.Parse(legacy.FailoverNode)
node.Failover = models.ParseBool(legacy.Failover)
return node return node
} }

View File

@@ -341,7 +341,6 @@ func getAllNodes(w http.ResponseWriter, r *http.Request) {
func getNode(w http.ResponseWriter, r *http.Request) { func getNode(w http.ResponseWriter, r *http.Request) {
// set header. // set header.
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
nodeRequest := r.Header.Get("requestfrom") == "node"
var params = mux.Vars(r) var params = mux.Vars(r)
nodeid := params["nodeid"] nodeid := params["nodeid"]
@@ -386,12 +385,6 @@ func getNode(w http.ResponseWriter, r *http.Request) {
PeerIDs: hostPeerUpdate.PeerIDs, PeerIDs: hostPeerUpdate.PeerIDs,
} }
if servercfg.IsPro && nodeRequest {
if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil {
logger.Log(1, "failed to reset failover list during node config pull", node.ID.String(), node.Network)
}
}
logger.Log(2, r.Header.Get("user"), "fetched node", params["nodeid"]) logger.Log(2, r.Header.Get("user"), "fetched node", params["nodeid"])
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response) json.NewEncoder(w).Encode(response)
@@ -524,12 +517,6 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
return return
} }
if servercfg.IsPro && request.Failover {
if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil {
logger.Log(1, "failed to reset failover list during failover create", node.ID.String(), node.Network)
}
}
apiNode := node.ConvertToAPINode() apiNode := node.ConvertToAPINode()
logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid) logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@@ -562,7 +549,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "bad request")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "bad request"))
return return
} }
node, wasFailover, removedClients, err := logic.DeleteIngressGateway(nodeid) node, removedClients, err := logic.DeleteIngressGateway(nodeid)
if err != nil { if err != nil {
logger.Log(0, r.Header.Get("user"), logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("failed to delete ingress gateway on node [%s] on network [%s]: %v", fmt.Sprintf("failed to delete ingress gateway on node [%s] on network [%s]: %v",
@@ -572,11 +559,6 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
} }
if servercfg.IsPro { if servercfg.IsPro {
if wasFailover {
if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil {
logger.Log(1, "failed to reset failover list during failover create", node.ID.String(), node.Network)
}
}
go func() { go func() {
users, err := logic.GetUsersDB() users, err := logic.GetUsersDB()
if err == nil { if err == nil {
@@ -662,11 +644,6 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
} }
ifaceDelta := logic.IfaceDelta(&currentNode, newNode) ifaceDelta := logic.IfaceDelta(&currentNode, newNode)
aclUpdate := currentNode.DefaultACL != newNode.DefaultACL aclUpdate := currentNode.DefaultACL != newNode.DefaultACL
if ifaceDelta && servercfg.IsPro {
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
logger.Log(0, "failed to reset failover lists during node update for node", currentNode.ID.String(), currentNode.Network)
}
}
err = logic.UpdateNode(&currentNode, newNode) err = logic.UpdateNode(&currentNode, newNode)
if err != nil { if err != nil {

View File

@@ -44,6 +44,17 @@ func ReturnSuccessResponse(response http.ResponseWriter, request *http.Request,
json.NewEncoder(response).Encode(httpResponse) json.NewEncoder(response).Encode(httpResponse)
} }
// ReturnSuccessResponseWithJson - processes message and adds header
func ReturnSuccessResponseWithJson(response http.ResponseWriter, request *http.Request, res interface{}, message string) {
var httpResponse models.SuccessResponse
httpResponse.Code = http.StatusOK
httpResponse.Response = res
httpResponse.Message = message
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
json.NewEncoder(response).Encode(httpResponse)
}
// ReturnErrorResponse - processes error and adds header // ReturnErrorResponse - processes error and adds header
func ReturnErrorResponse(response http.ResponseWriter, request *http.Request, errorMessage models.ErrorResponse) { func ReturnErrorResponse(response http.ResponseWriter, request *http.Request, errorMessage models.ErrorResponse) {
httpResponse := &models.ErrorResponse{Code: errorMessage.Code, Message: errorMessage.Message} httpResponse := &models.ErrorResponse{Code: errorMessage.Code, Message: errorMessage.Message}

View File

@@ -8,7 +8,6 @@ import (
"github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
) )
// GetInternetGateways - gets all the nodes that are internet gateways // GetInternetGateways - gets all the nodes that are internet gateways
@@ -168,9 +167,6 @@ func CreateIngressGateway(netid string, nodeid string, ingress models.IngressReq
node.IngressGatewayRange6 = network.AddressRange6 node.IngressGatewayRange6 = network.AddressRange6
node.IngressDNS = ingress.ExtclientDNS node.IngressDNS = ingress.ExtclientDNS
node.SetLastModified() node.SetLastModified()
if ingress.Failover && servercfg.IsPro {
node.Failover = true
}
err = UpsertNode(&node) err = UpsertNode(&node)
if err != nil { if err != nil {
return models.Node{}, err return models.Node{}, err
@@ -199,35 +195,33 @@ func GetIngressGwUsers(node models.Node) (models.IngressGwUsers, error) {
} }
// DeleteIngressGateway - deletes an ingress gateway // DeleteIngressGateway - deletes an ingress gateway
func DeleteIngressGateway(nodeid string) (models.Node, bool, []models.ExtClient, error) { func DeleteIngressGateway(nodeid string) (models.Node, []models.ExtClient, error) {
removedClients := []models.ExtClient{} removedClients := []models.ExtClient{}
node, err := GetNodeByID(nodeid) node, err := GetNodeByID(nodeid)
if err != nil { if err != nil {
return models.Node{}, false, removedClients, err return models.Node{}, removedClients, err
} }
clients, err := GetExtClientsByID(nodeid, node.Network) clients, err := GetExtClientsByID(nodeid, node.Network)
if err != nil && !database.IsEmptyRecord(err) { if err != nil && !database.IsEmptyRecord(err) {
return models.Node{}, false, removedClients, err return models.Node{}, removedClients, err
} }
removedClients = clients removedClients = clients
// delete ext clients belonging to ingress gateway // delete ext clients belonging to ingress gateway
if err = DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil { if err = DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
return models.Node{}, false, removedClients, err return models.Node{}, removedClients, err
} }
logger.Log(3, "deleting ingress gateway") logger.Log(3, "deleting ingress gateway")
wasFailover := node.Failover
node.LastModified = time.Now() node.LastModified = time.Now()
node.IsIngressGateway = false node.IsIngressGateway = false
node.IngressGatewayRange = "" node.IngressGatewayRange = ""
node.Failover = false
err = UpsertNode(&node) err = UpsertNode(&node)
if err != nil { if err != nil {
return models.Node{}, wasFailover, removedClients, err return models.Node{}, removedClients, err
} }
err = SetNetworkNodesLastModified(node.Network) err = SetNetworkNodesLastModified(node.Network)
return node, wasFailover, removedClients, err return node, removedClients, err
} }
// DeleteGatewayExtClients - deletes ext clients based on gateway (mac) of ingress node and network // DeleteGatewayExtClients - deletes ext clients based on gateway (mac) of ingress node and network

View File

@@ -156,6 +156,20 @@ func GetHost(hostid string) (*models.Host, error) {
return &h, nil return &h, nil
} }
// GetHostByPubKey - gets a host from db given pubkey
func GetHostByPubKey(hostPubKey string) (*models.Host, error) {
hosts, err := GetAllHosts()
if err != nil {
return nil, err
}
for _, host := range hosts {
if host.PublicKey.String() == hostPubKey {
return &host, nil
}
}
return nil, errors.New("host not found")
}
// CreateHost - creates a host if not exist // CreateHost - creates a host if not exist
func CreateHost(h *models.Host) error { func CreateHost(h *models.Host) error {
hosts, hErr := GetAllHosts() hosts, hErr := GetAllHosts()

View File

@@ -205,6 +205,9 @@ func DeleteNode(node *models.Node, purge bool) error {
UpsertNode(&relayNode) UpsertNode(&relayNode)
} }
} }
if node.FailedOverBy != uuid.Nil {
ResetFailedOverPeer(node)
}
if node.IsRelay { if node.IsRelay {
// unset all the relayed nodes // unset all the relayed nodes
SetRelayedNodes(false, node.ID.String(), node.RelayedNodes) SetRelayedNodes(false, node.ID.String(), node.RelayedNodes)
@@ -233,11 +236,6 @@ func DeleteNode(node *models.Node, purge bool) error {
if err := DissasociateNodeFromHost(node, host); err != nil { if err := DissasociateNodeFromHost(node, host); err != nil {
return err return err
} }
if servercfg.IsPro {
if err := EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil {
logger.Log(0, "failed to reset failover lists during node delete for node", host.Name, node.Network)
}
}
return nil return nil
} }
@@ -309,20 +307,6 @@ func ValidateNode(node *models.Node, isUpdate bool) error {
return err return err
} }
// IsFailoverPresent - checks if a node is marked as a failover in given network
func IsFailoverPresent(network string) bool {
netNodes, err := GetNetworkNodes(network)
if err != nil {
return false
}
for i := range netNodes {
if netNodes[i].Failover {
return true
}
}
return false
}
// GetAllNodes - returns all nodes in the DB // GetAllNodes - returns all nodes in the DB
func GetAllNodes() ([]models.Node, error) { func GetAllNodes() ([]models.Node, error) {
var nodes []models.Node var nodes []models.Node
@@ -385,6 +369,9 @@ func SetNodeDefaults(node *models.Node) {
if node.DefaultACL == "" { if node.DefaultACL == "" {
node.DefaultACL = parentNetwork.DefaultACL node.DefaultACL = parentNetwork.DefaultACL
} }
if node.FailOverPeers == nil {
node.FailOverPeers = make(map[string]struct{})
}
node.SetLastModified() node.SetLastModified()
node.SetLastCheckIn() node.SetLastCheckIn()

View File

@@ -15,6 +15,17 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
var (
// ResetFailOver - function to reset failOvered peers on this node
ResetFailOver = func(failOverNode *models.Node) error {
return nil
}
// ResetFailedOverPeer - removes failed over node from network peers
ResetFailedOverPeer = func(failedOverNode *models.Node) error {
return nil
}
)
// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.Node, func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.Node,
deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) { deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
@@ -132,7 +143,9 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
if peer.IsIngressGateway { if peer.IsIngressGateway {
hostPeerUpdate.EgressRoutes = append(hostPeerUpdate.EgressRoutes, getExtpeersExtraRoutes(peer.Network)...) hostPeerUpdate.EgressRoutes = append(hostPeerUpdate.EgressRoutes, getExtpeersExtraRoutes(peer.Network)...)
} }
if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || (peer.IsRelayed && peer.RelayedBy != node.ID.String()) { _, isFailOverPeer := node.FailOverPeers[peer.ID.String()]
if (node.IsRelayed && node.RelayedBy != peer.ID.String()) ||
(peer.IsRelayed && peer.RelayedBy != node.ID.String()) || isFailOverPeer {
// if node is relayed and peer is not the relay, set remove to true // if node is relayed and peer is not the relay, set remove to true
if _, ok := peerIndexMap[peerHost.PublicKey.String()]; ok { if _, ok := peerIndexMap[peerHost.PublicKey.String()]; ok {
continue continue
@@ -197,9 +210,10 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]] nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
} }
if node.Network == network { // add to peers map for metrics if node.Network == network && !peerConfig.Remove { // add to peers map for metrics
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{ hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
ID: peer.ID.String(), ID: peer.ID.String(),
HostID: peerHost.ID.String(),
Address: peer.PrimaryAddress(), Address: peer.PrimaryAddress(),
Name: peerHost.Name, Name: peerHost.Name,
Network: peer.Network, Network: peer.Network,
@@ -316,6 +330,31 @@ func GetAllowedIPs(node, peer *models.Node, metrics *models.Metrics) []net.IPNet
return allowedips return allowedips
} }
func GetFailOverPeerIps(peer, node *models.Node) []net.IPNet {
allowedips := []net.IPNet{}
for failOverpeerID := range node.FailOverPeers {
failOverpeer, err := GetNodeByID(failOverpeerID)
if err == nil && failOverpeer.FailedOverBy == peer.ID {
if failOverpeer.Address.IP != nil {
allowed := net.IPNet{
IP: failOverpeer.Address.IP,
Mask: net.CIDRMask(32, 32),
}
allowedips = append(allowedips, allowed)
}
if failOverpeer.Address6.IP != nil {
allowed := net.IPNet{
IP: failOverpeer.Address6.IP,
Mask: net.CIDRMask(128, 128),
}
allowedips = append(allowedips, allowed)
}
}
}
return allowedips
}
func GetEgressIPs(peer *models.Node) []net.IPNet { func GetEgressIPs(peer *models.Node) []net.IPNet {
peerHost, err := GetHost(peer.HostID.String()) peerHost, err := GetHost(peer.HostID.String())
@@ -379,6 +418,9 @@ func getNodeAllowedIPs(peer, node *models.Node) []net.IPNet {
if peer.IsRelay { if peer.IsRelay {
allowedips = append(allowedips, RelayedAllowedIPs(peer, node)...) allowedips = append(allowedips, RelayedAllowedIPs(peer, node)...)
} }
if peer.IsFailOver {
allowedips = append(allowedips, GetFailOverPeerIps(peer, node)...)
}
return allowedips return allowedips
} }

View File

@@ -1,22 +1,8 @@
package logic package logic
import (
"github.com/google/uuid"
"github.com/gravitl/netmaker/models"
)
// EnterpriseCheckFuncs - can be set to run functions for EE // EnterpriseCheckFuncs - can be set to run functions for EE
var EnterpriseCheckFuncs []func() var EnterpriseCheckFuncs []func()
// EnterpriseFailoverFunc - interface to control failover funcs
var EnterpriseFailoverFunc func(node *models.Node) error
// EnterpriseResetFailoverFunc - interface to control reset failover funcs
var EnterpriseResetFailoverFunc func(network string) error
// EnterpriseResetAllPeersFailovers - resets all nodes that are considering a node to be failover worthy (inclusive)
var EnterpriseResetAllPeersFailovers func(nodeid uuid.UUID, network string) error
// == Join, Checkin, and Leave for Server == // == Join, Checkin, and Leave for Server ==
// KUBERNETES_LISTEN_PORT - starting port for Kubernetes in order to use NodePort range // KUBERNETES_LISTEN_PORT - starting port for Kubernetes in order to use NodePort range

View File

@@ -26,10 +26,6 @@ type ApiHost struct {
MacAddress string `json:"macaddress"` MacAddress string `json:"macaddress"`
Nodes []string `json:"nodes"` Nodes []string `json:"nodes"`
IsDefault bool `json:"isdefault" yaml:"isdefault"` IsDefault bool `json:"isdefault" yaml:"isdefault"`
IsRelayed bool `json:"isrelayed" yaml:"isrelayed" bson:"isrelayed"`
RelayedBy string `json:"relayed_by" yaml:"relayed_by" bson:"relayed_by"`
IsRelay bool `json:"isrelay" yaml:"isrelay" bson:"isrelay"`
RelayedHosts []string `json:"relay_hosts" yaml:"relay_hosts" bson:"relay_hosts"`
NatType string `json:"nat_type" yaml:"nat_type"` NatType string `json:"nat_type" yaml:"nat_type"`
PersistentKeepalive int `json:"persistentkeepalive" yaml:"persistentkeepalive"` PersistentKeepalive int `json:"persistentkeepalive" yaml:"persistentkeepalive"`
AutoUpdate bool `json:"autoupdate" yaml:"autoupdate"` AutoUpdate bool `json:"autoupdate" yaml:"autoupdate"`

View File

@@ -30,7 +30,6 @@ type ApiNode struct {
IsIngressGateway bool `json:"isingressgateway"` IsIngressGateway bool `json:"isingressgateway"`
EgressGatewayRanges []string `json:"egressgatewayranges"` EgressGatewayRanges []string `json:"egressgatewayranges"`
EgressGatewayNatEnabled bool `json:"egressgatewaynatenabled"` EgressGatewayNatEnabled bool `json:"egressgatewaynatenabled"`
FailoverNode string `json:"failovernode"`
DNSOn bool `json:"dnson"` DNSOn bool `json:"dnson"`
IngressDns string `json:"ingressdns"` IngressDns string `json:"ingressdns"`
Server string `json:"server"` Server string `json:"server"`
@@ -39,7 +38,9 @@ type ApiNode struct {
PendingDelete bool `json:"pendingdelete"` PendingDelete bool `json:"pendingdelete"`
// == PRO == // == PRO ==
DefaultACL string `json:"defaultacl,omitempty" validate:"checkyesornoorunset"` DefaultACL string `json:"defaultacl,omitempty" validate:"checkyesornoorunset"`
Failover bool `json:"failover"` IsFailOver bool `json:"is_fail_over"`
FailOverPeers map[string]struct{} `json:"fail_over_peers" yaml:"fail_over_peers"`
FailedOverBy uuid.UUID `json:"failed_over_by" yaml:"failed_over_by"`
} }
// ApiNode.ConvertToServerNode - converts an api node to a server node // ApiNode.ConvertToServerNode - converts an api node to a server node
@@ -56,7 +57,8 @@ func (a *ApiNode) ConvertToServerNode(currentNode *Node) *Node {
convertedNode.RelayedBy = a.RelayedBy convertedNode.RelayedBy = a.RelayedBy
convertedNode.RelayedNodes = a.RelayedNodes convertedNode.RelayedNodes = a.RelayedNodes
convertedNode.PendingDelete = a.PendingDelete convertedNode.PendingDelete = a.PendingDelete
convertedNode.Failover = a.Failover convertedNode.FailedOverBy = currentNode.FailedOverBy
convertedNode.FailOverPeers = currentNode.FailOverPeers
convertedNode.IsEgressGateway = a.IsEgressGateway convertedNode.IsEgressGateway = a.IsEgressGateway
convertedNode.IsIngressGateway = a.IsIngressGateway convertedNode.IsIngressGateway = a.IsIngressGateway
// prevents user from changing ranges, must delete and recreate // prevents user from changing ranges, must delete and recreate
@@ -100,7 +102,6 @@ func (a *ApiNode) ConvertToServerNode(currentNode *Node) *Node {
convertedNode.Address6 = *addr6 convertedNode.Address6 = *addr6
convertedNode.Address6.IP = ip6 convertedNode.Address6.IP = ip6
} }
convertedNode.FailoverNode, _ = uuid.Parse(a.FailoverNode)
convertedNode.LastModified = time.Unix(a.LastModified, 0) convertedNode.LastModified = time.Unix(a.LastModified, 0)
convertedNode.LastCheckIn = time.Unix(a.LastCheckIn, 0) convertedNode.LastCheckIn = time.Unix(a.LastCheckIn, 0)
convertedNode.LastPeerUpdate = time.Unix(a.LastPeerUpdate, 0) convertedNode.LastPeerUpdate = time.Unix(a.LastPeerUpdate, 0)
@@ -146,10 +147,6 @@ func (nm *Node) ConvertToAPINode() *ApiNode {
apiNode.IsIngressGateway = nm.IsIngressGateway apiNode.IsIngressGateway = nm.IsIngressGateway
apiNode.EgressGatewayRanges = nm.EgressGatewayRanges apiNode.EgressGatewayRanges = nm.EgressGatewayRanges
apiNode.EgressGatewayNatEnabled = nm.EgressGatewayNatEnabled apiNode.EgressGatewayNatEnabled = nm.EgressGatewayNatEnabled
apiNode.FailoverNode = nm.FailoverNode.String()
if isUUIDSet(apiNode.FailoverNode) {
apiNode.FailoverNode = ""
}
apiNode.DNSOn = nm.DNSOn apiNode.DNSOn = nm.DNSOn
apiNode.IngressDns = nm.IngressDNS apiNode.IngressDns = nm.IngressDNS
apiNode.Server = nm.Server apiNode.Server = nm.Server
@@ -160,14 +157,12 @@ func (nm *Node) ConvertToAPINode() *ApiNode {
apiNode.Connected = nm.Connected apiNode.Connected = nm.Connected
apiNode.PendingDelete = nm.PendingDelete apiNode.PendingDelete = nm.PendingDelete
apiNode.DefaultACL = nm.DefaultACL apiNode.DefaultACL = nm.DefaultACL
apiNode.Failover = nm.Failover apiNode.IsFailOver = nm.IsFailOver
apiNode.FailOverPeers = nm.FailOverPeers
apiNode.FailedOverBy = nm.FailedOverBy
return &apiNode return &apiNode
} }
func isEmptyAddr(addr string) bool { func isEmptyAddr(addr string) bool {
return addr == "<nil>" || addr == ":0" return addr == "<nil>" || addr == ":0"
} }
func isUUIDSet(uuid string) bool {
return uuid != "00000000-0000-0000-0000-000000000000"
}

View File

@@ -126,6 +126,8 @@ const (
Disconnect SignalAction = "DISCONNECT" Disconnect SignalAction = "DISCONNECT"
// ConnNegotiation - action to negotiate connection between peers // ConnNegotiation - action to negotiate connection between peers
ConnNegotiation SignalAction = "CONNECTION_NEGOTIATION" ConnNegotiation SignalAction = "CONNECTION_NEGOTIATION"
// RelayME - action to relay the peer
RelayME SignalAction = "RELAY_ME"
) )
// HostUpdate - struct for host update // HostUpdate - struct for host update
@@ -148,8 +150,13 @@ type Signal struct {
FromHostPubKey string `json:"from_host_pubkey"` FromHostPubKey string `json:"from_host_pubkey"`
TurnRelayEndpoint string `json:"turn_relay_addr"` TurnRelayEndpoint string `json:"turn_relay_addr"`
ToHostPubKey string `json:"to_host_pubkey"` ToHostPubKey string `json:"to_host_pubkey"`
FromHostID string `json:"from_host_id"`
ToHostID string `json:"to_host_id"`
FromNodeID string `json:"from_node_id"`
ToNodeID string `json:"to_node_id"`
Reply bool `json:"reply"` Reply bool `json:"reply"`
Action SignalAction `json:"action"` Action SignalAction `json:"action"`
IsPro bool `json:"is_pro"`
TimeStamp int64 `json:"timestamp"` TimeStamp int64 `json:"timestamp"`
} }

View File

@@ -29,6 +29,7 @@ type Metric struct {
// IDandAddr - struct to hold ID and primary Address // IDandAddr - struct to hold ID and primary Address
type IDandAddr struct { type IDandAddr struct {
ID string `json:"id" bson:"id" yaml:"id"` ID string `json:"id" bson:"id" yaml:"id"`
HostID string `json:"host_id"`
Address string `json:"address" bson:"address" yaml:"address"` Address string `json:"address" bson:"address" yaml:"address"`
Name string `json:"name" bson:"name" yaml:"name"` Name string `json:"name" bson:"name" yaml:"name"`
IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"` IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"`

View File

@@ -70,3 +70,8 @@ type FwUpdate struct {
IsEgressGw bool `json:"is_egress_gw"` IsEgressGw bool `json:"is_egress_gw"`
EgressInfo map[string]EgressInfo `json:"egress_info"` EgressInfo map[string]EgressInfo `json:"egress_info"`
} }
// FailOverMeReq - struct for failover req
type FailOverMeReq struct {
NodeID string `json:"node_id"`
}

View File

@@ -92,8 +92,9 @@ type Node struct {
// == PRO == // == PRO ==
DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"` DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"`
OwnerID string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"` OwnerID string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"`
FailoverNode uuid.UUID `json:"failovernode" bson:"failovernode" yaml:"failovernode"` IsFailOver bool `json:"is_fail_over" yaml:"is_fail_over"`
Failover bool `json:"failover" bson:"failover" yaml:"failover"` FailOverPeers map[string]struct{} `json:"fail_over_peers" yaml:"fail_over_peers"`
FailedOverBy uuid.UUID `json:"failed_over_by" yaml:"failed_over_by"`
} }
// LegacyNode - legacy struct for node model // LegacyNode - legacy struct for node model
@@ -432,8 +433,8 @@ func (newNode *Node) Fill(currentNode *Node, isPro bool) { // TODO add new field
if newNode.DefaultACL == "" { if newNode.DefaultACL == "" {
newNode.DefaultACL = currentNode.DefaultACL newNode.DefaultACL = currentNode.DefaultACL
} }
if newNode.Failover != currentNode.Failover { if newNode.IsFailOver != currentNode.IsFailOver {
newNode.Failover = currentNode.Failover newNode.IsFailOver = currentNode.IsFailOver
} }
} }

View File

@@ -49,11 +49,6 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
} }
ifaceDelta := logic.IfaceDelta(&currentNode, &newNode) ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
if servercfg.IsPro && ifaceDelta {
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
slog.Warn("failed to reset failover list during node update", "nodeid", currentNode.ID, "network", currentNode.Network)
}
}
newNode.SetLastCheckIn() newNode.SetLastCheckIn()
if err := logic.UpdateNode(&currentNode, &newNode); err != nil { if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
slog.Error("error saving node", "id", id, "error", err) slog.Error("error saving node", "id", id, "error", err)

View File

@@ -73,6 +73,7 @@ func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
} }
func publish(host *models.Host, dest string, msg []byte) error { func publish(host *models.Host, dest string, msg []byte) error {
encrypted, encryptErr := encryptMsg(host, msg) encrypted, encryptErr := encryptMsg(host, msg)
if encryptErr != nil { if encryptErr != nil {
return encryptErr return encryptErr
@@ -80,6 +81,7 @@ func publish(host *models.Host, dest string, msg []byte) error {
if mqclient == nil { if mqclient == nil {
return errors.New("cannot publish ... mqclient not connected") return errors.New("cannot publish ... mqclient not connected")
} }
if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
var err error var err error
if token.Error() == nil { if token.Error() == nil {

201
pro/controllers/failover.go Normal file
View File

@@ -0,0 +1,201 @@
package controllers
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"github.com/google/uuid"
"github.com/gorilla/mux"
controller "github.com/gravitl/netmaker/controllers"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
proLogic "github.com/gravitl/netmaker/pro/logic"
"golang.org/x/exp/slog"
)
// FailOverHandlers - handlers for FailOver
func FailOverHandlers(r *mux.Router) {
r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(createfailOver))).Methods(http.MethodPost)
r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(deletefailOver))).Methods(http.MethodDelete)
r.HandleFunc("/api/v1/node/{network}/failover/reset", logic.SecurityCheck(true, http.HandlerFunc(resetFailOver))).Methods(http.MethodPost)
r.HandleFunc("/api/v1/node/{nodeid}/failover_me", controller.Authorize(true, false, "host", http.HandlerFunc(failOverME))).Methods(http.MethodPost)
}
// swagger:route POST /api/v1/node/failover node createfailOver
//
// Create a relay.
//
// Schemes: https
//
// Security:
// oauth
//
// Responses:
// 200: nodeResponse
func createfailOver(w http.ResponseWriter, r *http.Request) {
var params = mux.Vars(r)
nodeid := params["nodeid"]
// confirm host exists
node, err := logic.GetNodeByID(nodeid)
if err != nil {
slog.Error("failed to get node:", "error", err.Error())
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
if _, exists := proLogic.FailOverExists(node.Network); exists {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover exists already in the network"), "badrequest"))
return
}
host, err := logic.GetHost(node.HostID.String())
if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("error getting host"+err.Error()), "badrequest"))
return
}
if host.OS != models.OS_Types.Linux {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("only linux nodes can act as failovers"), "badrequest"))
return
}
if node.IsRelayed {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("cannot set relayed node as failover"), "badrequest"))
return
}
node.IsFailOver = true
err = logic.UpsertNode(&node)
if err != nil {
slog.Error("failed to upsert node", "node", node.ID.String(), "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
go mq.PublishPeerUpdate()
w.Header().Set("Content-Type", "application/json")
logic.ReturnSuccessResponseWithJson(w, r, node, "created failover successfully")
}
func resetFailOver(w http.ResponseWriter, r *http.Request) {
var params = mux.Vars(r)
net := params["network"]
nodes, err := logic.GetNetworkNodes(net)
if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
for _, node := range nodes {
if node.FailedOverBy != uuid.Nil {
node.FailedOverBy = uuid.Nil
node.FailOverPeers = make(map[string]struct{})
logic.UpsertNode(&node)
}
}
go mq.PublishPeerUpdate()
w.Header().Set("Content-Type", "application/json")
logic.ReturnSuccessResponse(w, r, "failover has been reset successfully")
}
// swagger:route DELETE /api/v1/node/failover node deletefailOver
//
// Create a relay.
//
// Schemes: https
//
// Security:
// oauth
//
// Responses:
// 200: nodeResponse
func deletefailOver(w http.ResponseWriter, r *http.Request) {
var params = mux.Vars(r)
nodeid := params["nodeid"]
// confirm host exists
node, err := logic.GetNodeByID(nodeid)
if err != nil {
slog.Error("failed to get node:", "error", err.Error())
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
node.IsFailOver = false
// Reset FailOvered Peers
err = logic.UpsertNode(&node)
if err != nil {
slog.Error("failed to upsert node", "node", node.ID.String(), "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
go func() {
proLogic.ResetFailOver(&node)
mq.PublishPeerUpdate()
}()
w.Header().Set("Content-Type", "application/json")
logic.ReturnSuccessResponseWithJson(w, r, node, "deleted failover successfully")
}
// swagger:route POST /api/node/{nodeid}/failOverME node failOver_me
//
// Create a relay.
//
// Schemes: https
//
// Security:
// oauth
//
// Responses:
// 200: nodeResponse
func failOverME(w http.ResponseWriter, r *http.Request) {
var params = mux.Vars(r)
nodeid := params["nodeid"]
// confirm host exists
node, err := logic.GetNodeByID(nodeid)
if err != nil {
logger.Log(0, r.Header.Get("user"), "failed to get node:", err.Error())
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
failOverNode, exists := proLogic.FailOverExists(node.Network)
if !exists {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover node doesn't exist in the network"), "badrequest"))
return
}
var failOverReq models.FailOverMeReq
err = json.NewDecoder(r.Body).Decode(&failOverReq)
if err != nil {
logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error())
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
var sendPeerUpdate bool
peerNode, err := logic.GetNodeByID(failOverReq.NodeID)
if err != nil {
slog.Error("peer not found: ", "nodeid", failOverReq.NodeID, "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("peer not found"), "badrequest"))
return
}
if node.IsRelayed || node.IsFailOver {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("node is relayed or acting as failover"), "badrequest"))
return
}
if peerNode.IsRelayed || peerNode.IsFailOver {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("peer node is relayed or acting as failover"), "badrequest"))
return
}
err = proLogic.SetFailOverCtx(failOverNode, node, peerNode)
if err != nil {
slog.Error("failed to create failover", "id", node.ID.String(),
"network", node.Network, "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("failed to create failover: %v", err), "internal"))
return
}
slog.Info("[auto-relay] created relay on node", "node", node.ID.String(), "network", node.Network)
sendPeerUpdate = true
if sendPeerUpdate {
go mq.PublishPeerUpdate()
}
w.Header().Set("Content-Type", "application/json")
logic.ReturnSuccessResponse(w, r, "relayed successfully")
}

View File

@@ -3,9 +3,11 @@ package controllers
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
proLogic "github.com/gravitl/netmaker/pro/logic"
"net/http" "net/http"
"github.com/google/uuid"
proLogic "github.com/gravitl/netmaker/pro/logic"
"github.com/gorilla/mux" "github.com/gorilla/mux"
controller "github.com/gravitl/netmaker/controllers" controller "github.com/gravitl/netmaker/controllers"
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
@@ -19,6 +21,7 @@ func RelayHandlers(r *mux.Router) {
r.HandleFunc("/api/nodes/{network}/{nodeid}/createrelay", controller.Authorize(false, true, "user", http.HandlerFunc(createRelay))).Methods(http.MethodPost) r.HandleFunc("/api/nodes/{network}/{nodeid}/createrelay", controller.Authorize(false, true, "user", http.HandlerFunc(createRelay))).Methods(http.MethodPost)
r.HandleFunc("/api/nodes/{network}/{nodeid}/deleterelay", controller.Authorize(false, true, "user", http.HandlerFunc(deleteRelay))).Methods(http.MethodDelete) r.HandleFunc("/api/nodes/{network}/{nodeid}/deleterelay", controller.Authorize(false, true, "user", http.HandlerFunc(deleteRelay))).Methods(http.MethodDelete)
r.HandleFunc("/api/v1/host/{hostid}/failoverme", controller.Authorize(true, false, "host", http.HandlerFunc(failOverME))).Methods(http.MethodPost)
} }
// swagger:route POST /api/nodes/{network}/{nodeid}/createrelay nodes createRelay // swagger:route POST /api/nodes/{network}/{nodeid}/createrelay nodes createRelay
@@ -51,6 +54,15 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return return
} }
for _, relayedNodeID := range relayNode.RelayedNodes {
relayedNode, err := logic.GetNodeByID(relayedNodeID)
if err == nil {
if relayedNode.FailedOverBy != uuid.Nil {
go logic.ResetFailedOverPeer(&relayedNode)
}
}
}
go mq.PublishPeerUpdate() go mq.PublishPeerUpdate()
logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID) logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
apiNode := relayNode.ConvertToAPINode() apiNode := relayNode.ConvertToAPINode()

View File

@@ -27,6 +27,7 @@ func InitPro() {
proControllers.MetricHandlers, proControllers.MetricHandlers,
proControllers.RelayHandlers, proControllers.RelayHandlers,
proControllers.UserHandlers, proControllers.UserHandlers,
proControllers.FailOverHandlers,
) )
logic.EnterpriseCheckFuncs = append(logic.EnterpriseCheckFuncs, func() { logic.EnterpriseCheckFuncs = append(logic.EnterpriseCheckFuncs, func() {
// == License Handling == // == License Handling ==
@@ -42,11 +43,9 @@ func InitPro() {
if servercfg.GetServerConfig().RacAutoDisable { if servercfg.GetServerConfig().RacAutoDisable {
AddRacHooks() AddRacHooks()
} }
resetFailover()
}) })
logic.EnterpriseFailoverFunc = proLogic.SetFailover logic.ResetFailOver = proLogic.ResetFailOver
logic.EnterpriseResetFailoverFunc = proLogic.ResetFailover logic.ResetFailedOverPeer = proLogic.ResetFailedOverPeer
logic.EnterpriseResetAllPeersFailovers = proLogic.WipeAffectedFailoversOnly
logic.DenyClientNodeAccess = proLogic.DenyClientNode logic.DenyClientNodeAccess = proLogic.DenyClientNode
logic.IsClientNodeAllowed = proLogic.IsClientNodeAllowed logic.IsClientNodeAllowed = proLogic.IsClientNodeAllowed
logic.AllowClientNodeAccess = proLogic.RemoveDeniedNodeFromClient logic.AllowClientNodeAccess = proLogic.RemoveDeniedNodeFromClient
@@ -65,18 +64,6 @@ func InitPro() {
mq.UpdateMetrics = proLogic.MQUpdateMetrics mq.UpdateMetrics = proLogic.MQUpdateMetrics
} }
func resetFailover() {
nets, err := logic.GetNetworks()
if err == nil {
for _, net := range nets {
err = proLogic.ResetFailover(net.NetID)
if err != nil {
slog.Error("failed to reset failover", "network", net.NetID, "error", err.Error())
}
}
}
}
func retrieveProLogo() string { func retrieveProLogo() string {
return ` return `
__ __ ______ ______ __ __ ______ __ __ ______ ______ __ __ ______ ______ __ __ ______ __ __ ______ ______

View File

@@ -1,122 +1,97 @@
package logic package logic
import ( import (
"errors"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
) )
// SetFailover - finds a suitable failover candidate and sets it func SetFailOverCtx(failOverNode, victimNode, peerNode models.Node) error {
func SetFailover(node *models.Node) error { if peerNode.FailOverPeers == nil {
failoverNode := determineFailoverCandidate(node) peerNode.FailOverPeers = make(map[string]struct{})
if failoverNode != nil { }
return setFailoverNode(failoverNode, node) if victimNode.FailOverPeers == nil {
victimNode.FailOverPeers = make(map[string]struct{})
}
peerNode.FailOverPeers[victimNode.ID.String()] = struct{}{}
victimNode.FailOverPeers[peerNode.ID.String()] = struct{}{}
victimNode.FailedOverBy = failOverNode.ID
peerNode.FailedOverBy = failOverNode.ID
if err := logic.UpsertNode(&failOverNode); err != nil {
return err
}
if err := logic.UpsertNode(&victimNode); err != nil {
return err
}
if err := logic.UpsertNode(&peerNode); err != nil {
return err
} }
return nil return nil
} }
// ResetFailover - sets the failover node and wipes disconnected status // GetFailOverNode - gets the host acting as failOver
func ResetFailover(network string) error { func GetFailOverNode(network string, allNodes []models.Node) (models.Node, error) {
nodes := logic.GetNetworkNodesMemory(allNodes, network)
for _, node := range nodes {
if node.IsFailOver {
return node, nil
}
}
return models.Node{}, errors.New("auto relay not found")
}
// FailOverExists - checks if failOver exists already in the network
func FailOverExists(network string) (failOverNode models.Node, exists bool) {
nodes, err := logic.GetNetworkNodes(network) nodes, err := logic.GetNetworkNodes(network)
if err != nil {
return
}
for _, node := range nodes {
if node.IsFailOver {
exists = true
failOverNode = node
return
}
}
return
}
// ResetFailedOverPeer - removes failed over node from network peers
func ResetFailedOverPeer(failedOveredNode *models.Node) error {
nodes, err := logic.GetNetworkNodes(failedOveredNode.Network)
if err != nil {
return err
}
failedOveredNode.FailedOverBy = uuid.Nil
failedOveredNode.FailOverPeers = make(map[string]struct{})
err = logic.UpsertNode(failedOveredNode)
if err != nil { if err != nil {
return err return err
} }
for _, node := range nodes { for _, node := range nodes {
node := node if node.FailOverPeers == nil || node.ID == failedOveredNode.ID {
err = SetFailover(&node)
if err != nil {
logger.Log(2, "error setting failover for node", node.ID.String(), ":", err.Error())
}
err = WipeFailover(node.ID.String())
if err != nil {
logger.Log(2, "error wiping failover for node", node.ID.String(), ":", err.Error())
}
}
return nil
}
// determineFailoverCandidate - returns a list of nodes that
// are suitable for relaying a given node
func determineFailoverCandidate(nodeToBeRelayed *models.Node) *models.Node {
currentNetworkNodes, err := logic.GetNetworkNodes(nodeToBeRelayed.Network)
if err != nil {
return nil
}
currentMetrics, err := GetMetrics(nodeToBeRelayed.ID.String())
if err != nil || currentMetrics == nil || currentMetrics.Connectivity == nil {
return nil
}
minLatency := int64(9223372036854775807) // max signed int64 value
var fastestCandidate *models.Node
for i := range currentNetworkNodes {
if currentNetworkNodes[i].ID == nodeToBeRelayed.ID {
continue continue
} }
delete(node.FailOverPeers, failedOveredNode.ID.String())
if currentMetrics.Connectivity[currentNetworkNodes[i].ID.String()].Connected && (currentNetworkNodes[i].Failover) { logic.UpsertNode(&node)
if currentMetrics.Connectivity[currentNetworkNodes[i].ID.String()].Latency < int64(minLatency) {
fastestCandidate = &currentNetworkNodes[i]
minLatency = currentMetrics.Connectivity[currentNetworkNodes[i].ID.String()].Latency
} }
} return nil
}
return fastestCandidate
} }
// setFailoverNode - changes node's failover node // ResetFailOver - reset failovered peers
func setFailoverNode(failoverNode, node *models.Node) error { func ResetFailOver(failOverNode *models.Node) error {
// Unset FailedOverPeers
node.FailoverNode = failoverNode.ID nodes, err := logic.GetNetworkNodes(failOverNode.Network)
nodeToUpdate, err := logic.GetNodeByID(node.ID.String())
if err != nil { if err != nil {
return err return err
} }
if nodeToUpdate.FailoverNode == failoverNode.ID { for _, node := range nodes {
return nil if node.FailedOverBy == failOverNode.ID {
} node.FailedOverBy = uuid.Nil
return logic.UpdateNode(&nodeToUpdate, node) node.FailOverPeers = make(map[string]struct{})
} logic.UpsertNode(&node)
// WipeFailover - removes the failover peers of given node (ID)
func WipeFailover(nodeid string) error {
metrics, err := GetMetrics(nodeid)
if err != nil {
return err
}
if metrics != nil {
metrics.FailoverPeers = make(map[string]string)
return logic.UpdateMetrics(nodeid, metrics)
}
return nil
}
// WipeAffectedFailoversOnly - wipes failovers for nodes that have given node (ID)
// in their respective failover lists
func WipeAffectedFailoversOnly(nodeid uuid.UUID, network string) error {
currentNetworkNodes, err := logic.GetNetworkNodes(network)
if err != nil {
return nil
}
WipeFailover(nodeid.String())
for i := range currentNetworkNodes {
currNodeID := currentNetworkNodes[i].ID
if currNodeID == nodeid {
continue
}
currMetrics, err := GetMetrics(currNodeID.String())
if err != nil || currMetrics == nil {
continue
}
if currMetrics.FailoverPeers != nil {
if len(currMetrics.FailoverPeers[nodeid.String()]) > 0 {
WipeFailover(currNodeID.String())
}
} }
} }
return nil return nil

View File

@@ -2,6 +2,9 @@ package logic
import ( import (
"encoding/json" "encoding/json"
"math"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
@@ -10,8 +13,6 @@ import (
"github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
"math"
"time"
) )
// GetMetrics - gets the metrics // GetMetrics - gets the metrics
@@ -80,13 +81,6 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
} }
} }
if newMetrics.Connectivity != nil {
err := logic.EnterpriseFailoverFunc(&currentNode)
if err != nil {
slog.Error("failed to failover for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
}
}
if shouldUpdate { if shouldUpdate {
slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network) slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
host, err := logic.GetHost(currentNode.HostID.String()) host, err := logic.GetHost(currentNode.HostID.String())
@@ -170,21 +164,6 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo
} }
// add nodes that need failover
nodes, err := logic.GetNetworkNodes(currentNode.Network)
if err != nil {
slog.Error("failed to retrieve nodes while updating metrics", "error", err)
return false
}
for _, node := range nodes {
if !newMetrics.Connectivity[node.ID.String()].Connected &&
len(newMetrics.Connectivity[node.ID.String()].NodeName) > 0 &&
node.Connected &&
len(node.FailoverNode) > 0 &&
!node.Failover {
newMetrics.FailoverPeers[node.ID.String()] = node.FailoverNode.String()
}
}
shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0 shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
for k, v := range oldMetrics.FailoverPeers { for k, v := range oldMetrics.FailoverPeers {
if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 { if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {

View File

@@ -69,7 +69,7 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
continue continue
} }
node.IsRelayed = setRelayed node.IsRelayed = setRelayed
if node.IsRelayed { if setRelayed {
node.RelayedBy = relay node.RelayedBy = relay
} else { } else {
node.RelayedBy = "" node.RelayedBy = ""
@@ -155,6 +155,7 @@ func UpdateRelayed(currentNode, newNode *models.Node) {
if len(updatenodes) > 0 { if len(updatenodes) > 0 {
for _, relayedNode := range updatenodes { for _, relayedNode := range updatenodes {
node := relayedNode node := relayedNode
ResetFailedOverPeer(&node)
go func() { go func() {
if err := mq.NodeUpdate(&node); err != nil { if err := mq.NodeUpdate(&node); err != nil {
slog.Error("error publishing node update to node", "node", node.ID, "error", err) slog.Error("error publishing node update to node", "node", node.ID, "error", err)

View File

@@ -1,3 +1,4 @@
// go:build ee
//go:build ee //go:build ee
// +build ee // +build ee