From f95306ddbd3289b82dd71c737dc28af6d9de738c Mon Sep 17 00:00:00 2001 From: Abhishek Kondur Date: Sun, 20 Nov 2022 09:35:29 +0530 Subject: [PATCH] make proxy optional --- compose/docker-compose.yml | 1 + config/config.go | 1 + controllers/node.go | 8 ++ logic/peers.go | 146 ++++++++++++++++-------------- logic/server.go | 19 ++-- logic/wireguard.go | 20 ++-- main.go | 33 ++++--- models/node.go | 1 + models/structs.go | 10 +- mq/publishers.go | 61 ++++++++----- netclient/cli_options/flags.go | 7 ++ netclient/config/config.go | 4 + netclient/functions/daemon.go | 28 ++---- netclient/functions/mqhandlers.go | 18 +++- netclient/functions/pull.go | 47 ++++++---- nm-proxy/common/common.go | 1 + nm-proxy/manager/manager.go | 42 ++++++++- nm-proxy/wg/wg.go | 4 +- servercfg/serverconf.go | 10 ++ 19 files changed, 294 insertions(+), 167 deletions(-) diff --git a/compose/docker-compose.yml b/compose/docker-compose.yml index 54b99c63..1c07744f 100644 --- a/compose/docker-compose.yml +++ b/compose/docker-compose.yml @@ -41,6 +41,7 @@ services: PORT_FORWARD_SERVICES: "dns" MQ_ADMIN_PASSWORD: "REPLACE_MQ_ADMIN_PASSWORD" STUN_PORT: "3478" + PROXY: "on" ports: - "51821-51830:51821-51830/udp" expose: diff --git a/config/config.go b/config/config.go index e0945a53..443af912 100644 --- a/config/config.go +++ b/config/config.go @@ -77,6 +77,7 @@ type ServerConfig struct { NetmakerAccountID string `yaml:"netmaker_account_id"` IsEE string `yaml:"is_ee"` StunPort string `yaml:"stun_port"` + Proxy string `yaml:"proxy"` } // SQLConfig - Generic SQL Config diff --git a/controllers/node.go b/controllers/node.go index 1bd3dd1b..ec6fdfb7 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -481,6 +481,14 @@ func getNode(w http.ResponseWriter, r *http.Request) { ServerConfig: servercfg.GetServerInfo(), PeerIDs: peerUpdate.PeerIDs, } + if node.Proxy { + proxyPayload, err := logic.GetPeersForProxy(&node, false) + if err == nil { + response.ProxyUpdate = proxyPayload + } else { + logger.Log(0, "failed to get proxy update: ", err.Error()) + } + } if servercfg.Is_EE && nodeRequest { if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil { diff --git a/logic/peers.go b/logic/peers.go index f6505646..cfd51b2e 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -13,6 +13,7 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic/acls/nodeacls" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/nm-proxy/manager" "github.com/gravitl/netmaker/servercfg" "golang.org/x/exp/slices" @@ -82,7 +83,11 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ManagerPayload logger.Log(1, "failed to parse node pub key: ", peer.ID) continue } - endpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", peer.Endpoint, peer.LocalListenPort)) + listenPort := peer.LocalListenPort + if listenPort == 0 { + listenPort = peer.ListenPort + } + endpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", peer.Endpoint, listenPort)) if err != nil { logger.Log(1, "failed to resolve udp addr for node: ", peer.ID, peer.Endpoint, err.Error()) continue @@ -93,6 +98,10 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ManagerPayload // set_keepalive keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s") } + proxyStatus := peer.Proxy + if peer.Server == "yes" { + proxyStatus = servercfg.IsProxyEnabled() + } peers = append(peers, wgtypes.PeerConfig{ PublicKey: pubkey, Endpoint: endpoint, @@ -102,7 +111,9 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ManagerPayload }) peerConfMap[peer.PublicKey] = manager.PeerConf{ Address: peer.PrimaryAddress(), + Proxy: proxyStatus, } + if !onlyPeers && peer.IsRelayed == "yes" { relayNode := FindRelay(&peer) if relayNode != nil { @@ -113,6 +124,7 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ManagerPayload IsRelayed: true, RelayedTo: relayTo, Address: peer.PrimaryAddress(), + Proxy: proxyStatus, } } @@ -171,9 +183,9 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { return models.PeerUpdate{}, err } - // if node.IsRelayed == "yes" { - // return GetPeerUpdateForRelayedNode(node, udppeers) - // } + if node.IsRelayed == "yes" { + return GetPeerUpdateForRelayedNode(node, udppeers) + } // #1 Set Keepalive values: set_keepalive // #2 Set local address: set_local - could be a LOT BETTER and fix some bugs with additional logic @@ -195,15 +207,15 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { // if the node is not a server, set the endpoint var setEndpoint = !(node.IsServer == "yes") - // if peer.IsRelayed == "yes" { - // if !(node.IsRelay == "yes" && ncutils.StringSliceContains(node.RelayAddrs, peer.PrimaryAddress())) { - // //skip -- will be added to relay - // continue - // } else if node.IsRelay == "yes" && ncutils.StringSliceContains(node.RelayAddrs, peer.PrimaryAddress()) { - // // dont set peer endpoint if it's relayed by node - // setEndpoint = false - // } - // } + if peer.IsRelayed == "yes" { + if !(node.IsRelay == "yes" && ncutils.StringSliceContains(node.RelayAddrs, peer.PrimaryAddress())) { + //skip -- will be added to relay + continue + } else if node.IsRelay == "yes" && ncutils.StringSliceContains(node.RelayAddrs, peer.PrimaryAddress()) { + // dont set peer endpoint if it's relayed by node + setEndpoint = false + } + } if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID), nodeacls.NodeID(peer.ID)) { //skip if not permitted by acl continue @@ -478,60 +490,60 @@ func GetAllowedIPs(node, peer *models.Node, metrics *models.Metrics) []net.IPNet } } // handle relay gateway peers - // if peer.IsRelay == "yes" { - // for _, ip := range peer.RelayAddrs { - // //find node ID of relayed peer - // relayedPeer, err := findNode(ip) - // if err != nil { - // logger.Log(0, "failed to find node for ip ", ip, err.Error()) - // continue - // } - // if relayedPeer == nil { - // continue - // } - // if relayedPeer.ID == node.ID { - // //skip self - // continue - // } - // //check if acl permits comms - // if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID), nodeacls.NodeID(relayedPeer.ID)) { - // continue - // } - // if iplib.Version(net.ParseIP(ip)) == 4 { - // relayAddr := net.IPNet{ - // IP: net.ParseIP(ip), - // Mask: net.CIDRMask(32, 32), - // } - // allowedips = append(allowedips, relayAddr) - // } - // if iplib.Version(net.ParseIP(ip)) == 6 { - // relayAddr := net.IPNet{ - // IP: net.ParseIP(ip), - // Mask: net.CIDRMask(128, 128), - // } - // allowedips = append(allowedips, relayAddr) - // } - // relayedNode, err := findNode(ip) - // if err != nil { - // logger.Log(1, "unable to find node for relayed address", ip, err.Error()) - // continue - // } - // if relayedNode.IsEgressGateway == "yes" { - // extAllowedIPs := getEgressIPs(node, relayedNode) - // allowedips = append(allowedips, extAllowedIPs...) - // } - // if relayedNode.IsIngressGateway == "yes" { - // extPeers, _, err := getExtPeers(relayedNode) - // if err == nil { - // for _, extPeer := range extPeers { - // allowedips = append(allowedips, extPeer.AllowedIPs...) - // } - // } else { - // logger.Log(0, "failed to retrieve extclients from relayed ingress", err.Error()) - // } - // } - // } - // } + if peer.IsRelay == "yes" { + for _, ip := range peer.RelayAddrs { + //find node ID of relayed peer + relayedPeer, err := findNode(ip) + if err != nil { + logger.Log(0, "failed to find node for ip ", ip, err.Error()) + continue + } + if relayedPeer == nil { + continue + } + if relayedPeer.ID == node.ID { + //skip self + continue + } + //check if acl permits comms + if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID), nodeacls.NodeID(relayedPeer.ID)) { + continue + } + if iplib.Version(net.ParseIP(ip)) == 4 { + relayAddr := net.IPNet{ + IP: net.ParseIP(ip), + Mask: net.CIDRMask(32, 32), + } + allowedips = append(allowedips, relayAddr) + } + if iplib.Version(net.ParseIP(ip)) == 6 { + relayAddr := net.IPNet{ + IP: net.ParseIP(ip), + Mask: net.CIDRMask(128, 128), + } + allowedips = append(allowedips, relayAddr) + } + relayedNode, err := findNode(ip) + if err != nil { + logger.Log(1, "unable to find node for relayed address", ip, err.Error()) + continue + } + if relayedNode.IsEgressGateway == "yes" { + extAllowedIPs := getEgressIPs(node, relayedNode) + allowedips = append(allowedips, extAllowedIPs...) + } + if relayedNode.IsIngressGateway == "yes" { + extPeers, _, err := getExtPeers(relayedNode) + if err == nil { + for _, extPeer := range extPeers { + allowedips = append(allowedips, extPeer.AllowedIPs...) + } + } else { + logger.Log(0, "failed to retrieve extclients from relayed ingress", err.Error()) + } + } + } + } return allowedips } diff --git a/logic/server.go b/logic/server.go index e3da3a66..a13cfeff 100644 --- a/logic/server.go +++ b/logic/server.go @@ -175,16 +175,19 @@ func ServerJoin(networkSettings *models.Network) (models.Node, error) { if err != nil { return returnNode, err } - proxyPayload, err := GetPeersForProxy(node, false) - if err != nil && !ncutils.IsEmptyRecord(err) { - logger.Log(1, "failed to retrieve peers") - return returnNode, err + if servercfg.IsProxyEnabled() { + proxyPayload, err := GetPeersForProxy(node, false) + if err != nil && !ncutils.IsEmptyRecord(err) { + logger.Log(1, "failed to retrieve peers") + return returnNode, err + } + + ProxyMgmChan <- &manager.ManagerAction{ + Action: manager.AddInterface, + Payload: proxyPayload, + } } - ProxyMgmChan <- &manager.ManagerAction{ - Action: manager.AddInterface, - Payload: proxyPayload, - } return *node, nil } diff --git a/logic/wireguard.go b/logic/wireguard.go index c05dd95e..095c68c5 100644 --- a/logic/wireguard.go +++ b/logic/wireguard.go @@ -10,6 +10,7 @@ import ( "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/netclient/wireguard" "github.com/gravitl/netmaker/nm-proxy/manager" + "github.com/gravitl/netmaker/servercfg" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) @@ -160,17 +161,20 @@ func setWGConfig(node *models.Node, peerupdate bool) error { logger.Log(3, "finished setting wg config on server", node.Name) } - logger.Log(0, "--------> ADD/Update INTERFACE TO PROXY.....") - proxyPayload, err := GetPeersForProxy(node, false) - if err != nil { - logger.Log(0, "failed to get peers for proxy: ", err.Error()) - } else { + if servercfg.IsProxyEnabled() { + logger.Log(0, "--------> ADD/Update INTERFACE TO PROXY.....") + proxyPayload, err := GetPeersForProxy(node, false) + if err != nil { + logger.Log(0, "failed to get peers for proxy: ", err.Error()) + } else { - ProxyMgmChan <- &manager.ManagerAction{ - Action: manager.AddInterface, - Payload: proxyPayload, + ProxyMgmChan <- &manager.ManagerAction{ + Action: manager.AddInterface, + Payload: proxyPayload, + } } } + return nil } diff --git a/main.go b/main.go index 5fb8607a..7d25594b 100644 --- a/main.go +++ b/main.go @@ -172,24 +172,29 @@ func startControllers() { if !servercfg.IsAgentBackend() && !servercfg.IsRestBackend() && !servercfg.IsMessageQueueBackend() { logger.Log(0, "No Server Mode selected, so nothing is being served! Set Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.") } + // starts the stun server waitnetwork.Add(1) go stunserver.Start(&waitnetwork) - waitnetwork.Add(1) - go func() { - defer waitnetwork.Done() - ctx, cancel := context.WithCancel(context.Background()) + if servercfg.IsProxyEnabled() { + waitnetwork.Add(1) - go nmproxy.Start(ctx, logic.ProxyMgmChan, servercfg.GetAPIHost()) - err := serverctl.SyncServerNetworkWithProxy() - if err != nil { - logger.Log(0, "failed to sync proxy with server interfaces: ", err.Error()) - } - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGTERM, os.Interrupt) - <-quit - cancel() - }() + go func() { + defer waitnetwork.Done() + ctx, cancel := context.WithCancel(context.Background()) + waitnetwork.Add(1) + + go nmproxy.Start(ctx, logic.ProxyMgmChan, servercfg.GetAPIHost()) + err := serverctl.SyncServerNetworkWithProxy() + if err != nil { + logger.Log(0, "failed to sync proxy with server interfaces: ", err.Error()) + } + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGTERM, os.Interrupt) + <-quit + cancel() + }() + } waitnetwork.Wait() } diff --git a/models/node.go b/models/node.go index c4641d35..2d084686 100644 --- a/models/node.go +++ b/models/node.go @@ -106,6 +106,7 @@ type Node struct { DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"` OwnerID string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"` Failover string `json:"failover" bson:"failover" yaml:"failover" validate:"checkyesorno"` + Proxy bool `json:"proxy" bson:"proxy" yaml:"proxy"` } // NodesArray - used for node sorting diff --git a/models/structs.go b/models/structs.go index feecf9e3..34c53ddd 100644 --- a/models/structs.go +++ b/models/structs.go @@ -5,6 +5,7 @@ import ( "time" jwt "github.com/golang-jwt/jwt/v4" + "github.com/gravitl/netmaker/nm-proxy/manager" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) @@ -202,10 +203,11 @@ type TrafficKeys struct { // NodeGet - struct for a single node get response type NodeGet struct { - Node Node `json:"node" bson:"node" yaml:"node"` - Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` - ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"` - PeerIDs PeerMap `json:"peerids,omitempty" bson:"peerids,omitempty" yaml:"peerids,omitempty"` + Node Node `json:"node" bson:"node" yaml:"node"` + Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` + ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"` + PeerIDs PeerMap `json:"peerids,omitempty" bson:"peerids,omitempty" yaml:"peerids,omitempty"` + ProxyUpdate manager.ManagerPayload `json:"proxy_update,omitempty" bson:"proxy_update,omitempty" yaml:"proxy_update,omitempty"` } // ServerConfig - struct for dealing with the server information for a netclient diff --git a/mq/publishers.go b/mq/publishers.go index 01787482..0e75097f 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -31,10 +31,13 @@ func PublishPeerUpdate(newNode *models.Node, publishToSelf bool) error { // logger.Log(1, "failed to publish proxy update to node", node.Name, "on network", node.Network, ":", err.Error()) // } if node.IsServer == "yes" { - err := PublishProxyUpdate(manager.AddInterface, &node) - if err != nil { - logger.Log(0, "failed to send proxy update for server: ", err.Error()) + if servercfg.IsProxyEnabled() { + err := PublishProxyUpdate(manager.AddInterface, &node) + if err != nil { + logger.Log(0, "failed to send proxy update for server: ", err.Error()) + } } + continue } if !publishToSelf && newNode.ID == node.ID { @@ -67,18 +70,22 @@ func PublishProxyUpdate(action manager.ProxyAction, node *models.Node) error { // PublishSinglePeerUpdate --- determines and publishes a peer update to one node func PublishSinglePeerUpdate(node *models.Node) error { + peerUpdate, err := logic.GetPeerUpdate(node) if err != nil { return err } - proxyUpdate, err := logic.GetPeersForProxy(node, false) - if err != nil { - return err - } - peerUpdate.ProxyUpdate = manager.ManagerAction{ - Action: manager.AddInterface, - Payload: proxyUpdate, + if node.Proxy { + proxyUpdate, err := logic.GetPeersForProxy(node, false) + if err != nil { + return err + } + peerUpdate.ProxyUpdate = manager.ManagerAction{ + Action: manager.AddInterface, + Payload: proxyUpdate, + } } + data, err := json.Marshal(&peerUpdate) if err != nil { return err @@ -117,6 +124,7 @@ func PublishExtPeerUpdate(node *models.Node) error { // NodeUpdate -- publishes a node update func NodeUpdate(node *models.Node) error { + var err error if !servercfg.IsMessageQueueBackend() || node.IsServer == "yes" { return nil @@ -127,19 +135,23 @@ func NodeUpdate(node *models.Node) error { node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them) } - data, err := json.Marshal(node) - if err != nil { - logger.Log(2, "error marshalling node update ", err.Error()) - return err - } - if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) - return err - } - err = PublishProxyUpdate(manager.AddInterface, node) - if err != nil { - logger.Log(1, "failed to publish proxy update to node", node.Name, "on network", node.Network, ":", err.Error()) + if node.Proxy { + err = PublishProxyUpdate(manager.AddInterface, node) + if err != nil { + logger.Log(1, "failed to publish proxy update to node", node.Name, "on network", node.Network, ":", err.Error()) + } + } else { + data, err := json.Marshal(node) + if err != nil { + logger.Log(2, "error marshalling node update ", err.Error()) + return err + } + if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { + logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) + return err + } } + return nil } @@ -224,7 +236,10 @@ func sendPeers() { if errN != nil { logger.Log(1, errN.Error()) } - serverctl.SyncServerNetworkWithProxy() + if servercfg.IsProxyEnabled() { + serverctl.SyncServerNetworkWithProxy() + } + } } } diff --git a/netclient/cli_options/flags.go b/netclient/cli_options/flags.go index 89204dd2..a7c2da88 100644 --- a/netclient/cli_options/flags.go +++ b/netclient/cli_options/flags.go @@ -12,6 +12,13 @@ func GetFlags(hostname string) []cli.Flag { Value: "all", Usage: "Network to perform specified action against.", }, + &cli.StringFlag{ + Name: "proxy", + // Aliases: []string{"np"}, + EnvVars: []string{"NETMAKER_PROXY"}, + Value: "off", + Usage: "To enable/disable proxy.", + }, &cli.StringFlag{ Name: "password", Aliases: []string{"p"}, diff --git a/netclient/config/config.go b/netclient/config/config.go index 2132c59d..1b5bbe42 100644 --- a/netclient/config/config.go +++ b/netclient/config/config.go @@ -227,6 +227,10 @@ func GetCLIConfig(c *cli.Context) (ClientConfig, string, error) { if c.String("key") != "" { cfg.AccessKey = c.String("key") } + if c.String("proxy") != "" { + cfg.Node.Proxy = c.String("proxy") == "on" + } + log.Println("_______________> PROXY: ", cfg.Node.Proxy) if c.String("network") != "all" { cfg.Network = c.String("network") cfg.Node.Network = c.String("network") diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index f7016332..93c591b4 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -33,7 +33,6 @@ import ( var ProxyMgmChan = make(chan *manager.ManagerAction, 100) var messageCache = new(sync.Map) -var ProxyStatus = "OFF" var serverSet map[string]bool var mqclient mqtt.Client @@ -125,25 +124,16 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc { go Checkin(ctx, wg) if len(networks) != 0 { - go func() { - cfg := config.ClientConfig{} - cfg.Network = networks[0] - cfg.ReadConfig() - apiHost, _, err := net.SplitHostPort(cfg.Server.API) - if err == nil { - if ProxyStatus != "ON" { - ProxyStatus = "ON" - pCtx, pCancel := context.WithCancel(context.Background()) - go nmproxy.Start(pCtx, ProxyMgmChan, apiHost) - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGTERM, os.Interrupt) - <-quit - pCancel() - logger.Log(0, "Proxy Shutting down....") - } + cfg := config.ClientConfig{} + cfg.Network = networks[0] + cfg.ReadConfig() + apiHost, _, err := net.SplitHostPort(cfg.Server.API) + if err == nil { + wg.Add(1) + go nmproxy.Start(ctx, ProxyMgmChan, apiHost) + logger.Log(0, "Proxy Shutting down....") - } - }() + } } diff --git a/netclient/functions/mqhandlers.go b/netclient/functions/mqhandlers.go index d6541da7..d71b4010 100644 --- a/netclient/functions/mqhandlers.go +++ b/netclient/functions/mqhandlers.go @@ -52,8 +52,6 @@ func ProxyUpdate(client mqtt.Client, msg mqtt.Message) { // NodeUpdate -- mqtt message handler for /update/ topic func NodeUpdate(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "----------> RECIEVED NODE UPDDATEEEEE") - return var newNode models.Node var nodeCfg config.ClientConfig var network = parseNetworkFromTopic(msg.Topic()) @@ -69,6 +67,15 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { logger.Log(0, "error unmarshalling node update data"+err.Error()) return } + if newNode.Proxy { + if newNode.Proxy != nodeCfg.Node.Proxy { + if err := config.Write(&nodeCfg, nodeCfg.Network); err != nil { + logger.Log(0, nodeCfg.Node.Network, "error updating node configuration: ", err.Error()) + } + } + logger.Log(0, "Node is attached with proxy,ignore this node update...") + return + } // see if cache hit, if so skip var currentMessage = read(newNode.Network, lastNodeUpdate) @@ -228,6 +235,11 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { cfg.Server.Version = peerUpdate.ServerVersion config.Write(&cfg, cfg.Network) } + + if cfg.Node.Proxy { + ProxyMgmChan <- &peerUpdate.ProxyUpdate + return + } file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" internetGateway, err := wireguard.UpdateWgPeers(file, peerUpdate.Peers) if err != nil { @@ -269,7 +281,7 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { // logger.Log(0, "error syncing wg after peer update: "+err.Error()) // return // } - ProxyMgmChan <- &peerUpdate.ProxyUpdate + logger.Log(0, "network:", cfg.Node.Network, "received peer update for node "+cfg.Node.Name+" "+cfg.Node.Network) if cfg.Node.DNSOn == "yes" { if err := setHostDNS(peerUpdate.DNS, cfg.Node.Interface, ncutils.IsWindows()); err != nil { diff --git a/netclient/functions/pull.go b/netclient/functions/pull.go index afe7ac94..99ad103e 100644 --- a/netclient/functions/pull.go +++ b/netclient/functions/pull.go @@ -15,6 +15,7 @@ import ( "github.com/gravitl/netmaker/netclient/local" "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/netclient/wireguard" + "github.com/gravitl/netmaker/nm-proxy/manager" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" //homedir "github.com/mitchellh/go-homedir" ) @@ -62,32 +63,44 @@ func Pull(network string, iface bool) (*models.Node, error) { logger.Log(0, "unable to update server config: "+err.Error()) } } - if nodeGET.Node.ListenPort != cfg.Node.LocalListenPort { - if err := wireguard.RemoveConf(resNode.Interface, false); err != nil { - logger.Log(0, "error remove interface", resNode.Interface, err.Error()) + if nodeGET.Node.Proxy { + ProxyMgmChan <- &manager.ManagerAction{ + Action: manager.AddInterface, + Payload: nodeGET.ProxyUpdate, } - err = ncutils.ModPort(&resNode) - if err != nil { - return nil, err - } - informPortChange(&resNode) } + if !nodeGET.Node.Proxy { + if nodeGET.Node.ListenPort != cfg.Node.LocalListenPort { + if err := wireguard.RemoveConf(resNode.Interface, false); err != nil { + logger.Log(0, "error remove interface", resNode.Interface, err.Error()) + } + err = ncutils.ModPort(&resNode) + if err != nil { + return nil, err + } + informPortChange(&resNode) + } + } + if err = config.ModNodeConfig(&resNode); err != nil { return nil, err } - if iface { - if err = wireguard.SetWGConfig(network, false, nodeGET.Peers[:]); err != nil { - return nil, err - } - } else { - if err = wireguard.SetWGConfig(network, true, nodeGET.Peers[:]); err != nil { - if errors.Is(err, os.ErrNotExist) && !ncutils.IsFreeBSD() { - return Pull(network, true) - } else { + if !nodeGET.Node.Proxy { + if iface { + if err = wireguard.SetWGConfig(network, false, nodeGET.Peers[:]); err != nil { return nil, err } + } else { + if err = wireguard.SetWGConfig(network, true, nodeGET.Peers[:]); err != nil { + if errors.Is(err, os.ErrNotExist) && !ncutils.IsFreeBSD() { + return Pull(network, true) + } else { + return nil, err + } + } } } + var bkupErr = config.SaveBackup(network) if bkupErr != nil { logger.Log(0, "unable to update backup file for", network) diff --git a/nm-proxy/common/common.go b/nm-proxy/common/common.go index 6f02b8d5..7a47e41c 100644 --- a/nm-proxy/common/common.go +++ b/nm-proxy/common/common.go @@ -56,6 +56,7 @@ type Config struct { // Proxy - WireguardProxy proxies type Proxy struct { + Status bool Ctx context.Context Cancel context.CancelFunc diff --git a/nm-proxy/manager/manager.go b/nm-proxy/manager/manager.go index af6c4a9e..aeb4f096 100644 --- a/nm-proxy/manager/manager.go +++ b/nm-proxy/manager/manager.go @@ -54,6 +54,7 @@ type PeerConf struct { IngressGatewayEndPoint *net.UDPAddr `json:"ingress_gateway_endpoint"` IsRelayed bool `json:"is_relayed"` RelayedTo *net.UDPAddr `json:"relayed_to"` + Proxy bool `json:"proxy"` } const ( @@ -169,6 +170,15 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) { var wgProxyConf common.WgIfaceConf var ok bool if wgProxyConf, ok = common.WgIFaceMap[m.Payload.InterfaceName]; !ok { + for i := len(m.Payload.Peers) - 1; i >= 0; i-- { + if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy { + log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey) + if err := wgIface.Update(m.Payload.Peers[i], false); err != nil { + log.Println("falied to update peer: ", err) + } + m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...) + } + } return wgIface, nil } if m.Payload.IsRelay { @@ -192,14 +202,31 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) { // check device conf different from proxy //wgProxyConf.Iface = wgIface.Device for i := len(m.Payload.Peers) - 1; i >= 0; i-- { + if currentPeer, ok := wgProxyConf.PeerMap[m.Payload.Peers[i].PublicKey.String()]; ok { + // check if proxy is off for the peer + if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy { + + // cleanup proxy connections for the peer + currentPeer.Proxy.Cancel() + time.Sleep(time.Second * 3) + delete(wgProxyConf.PeerMap, currentPeer.Config.Key) + // update the peer with actual endpoint + if err := wgIface.Update(m.Payload.Peers[i], false); err != nil { + log.Println("falied to update peer: ", err) + } + m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...) + continue + + } // check if peer is not connected to proxy devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Config.Key) if err == nil { - log.Printf("---------> COMAPRING ENDP{INT}: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.Proxy.LocalConn.LocalAddr().String()) + log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.Proxy.LocalConn.LocalAddr().String()) if devPeer.Endpoint.String() != currentPeer.Proxy.LocalConn.LocalAddr().String() { log.Println("---------> endpoint is not set to proxy: ", currentPeer.Config.Key) currentPeer.Proxy.Cancel() + time.Sleep(time.Second * 3) delete(wgProxyConf.PeerMap, currentPeer.Config.Key) continue } @@ -208,6 +235,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) { if currentPeer.Config.IsRelayed != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsRelayed { log.Println("---------> peer relay status has been changed: ", currentPeer.Config.Key) currentPeer.Proxy.Cancel() + time.Sleep(time.Second * 3) delete(wgProxyConf.PeerMap, currentPeer.Config.Key) continue } @@ -217,6 +245,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) { currentPeer.Config.RelayedEndpoint.String() != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo.String() { log.Println("---------> peer relay endpoint has been changed: ", currentPeer.Config.Key) currentPeer.Proxy.Cancel() + time.Sleep(time.Second * 3) delete(wgProxyConf.PeerMap, currentPeer.Config.Key) continue } @@ -224,6 +253,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) { if currentPeer.Proxy.RemoteConn.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() { log.Println("----------> Resetting proxy for Peer: ", currentPeer.Config.Key, m.Payload.InterfaceName) currentPeer.Proxy.Cancel() + time.Sleep(time.Second * 3) delete(wgProxyConf.PeerMap, currentPeer.Config.Key) } else { @@ -234,7 +264,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) { if err == nil { updatePeerConf.Endpoint = localUdpAddr } - if err := wgIface.Update(updatePeerConf); err != nil { + if err := wgIface.Update(updatePeerConf, true); err != nil { log.Println("failed to update peer: ", currentPeer.Config.Key, err) } currentPeer.Proxy.Config.PeerConf = &m.Payload.Peers[i] @@ -251,11 +281,18 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) { m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...) } + } else if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy { + log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey) + if err := wgIface.Update(m.Payload.Peers[i], false); err != nil { + log.Println("falied to update peer: ", err) + } + m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...) } } for _, currPeerI := range wgProxyConf.PeerMap { if _, ok := m.Payload.PeerMap[currPeerI.Config.Key]; !ok { currPeerI.Proxy.Cancel() + time.Sleep(time.Second * 3) // delete peer from interface log.Println("CurrPeer Not Found, Deleting Peer from Interface: ", currPeerI.Config.Key) if err := wgIface.RemovePeer(currPeerI.Config.Key); err != nil { @@ -306,6 +343,7 @@ func (m *ManagerAction) AddInterfaceToProxy() error { Endpoint: wgListenAddr, } for _, peerI := range m.Payload.Peers { + peerConf := m.Payload.PeerMap[peerI.PublicKey.String()] if peerI.Endpoint == nil && !(peerConf.IsAttachedExtClient || peerConf.IsExtClient) { log.Println("Endpoint nil for peer: ", peerI.PublicKey.String()) diff --git a/nm-proxy/wg/wg.go b/nm-proxy/wg/wg.go index 1352e609..308282a3 100644 --- a/nm-proxy/wg/wg.go +++ b/nm-proxy/wg/wg.go @@ -263,13 +263,13 @@ func (w *WGIface) RemovePeer(peerKey string) error { } // UpdatePeer -func (w *WGIface) Update(peerConf wgtypes.PeerConfig) error { +func (w *WGIface) Update(peerConf wgtypes.PeerConfig, updateOnly bool) error { w.mu.Lock() defer w.mu.Unlock() var err error log.Printf("---------> NEWWWWWW Updating peer %+v from interface %s ", peerConf, w.Name) - peerConf.UpdateOnly = true + peerConf.UpdateOnly = updateOnly peerConf.ReplaceAllowedIPs = true config := wgtypes.Config{ Peers: []wgtypes.PeerConfig{peerConf}, diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index a5796011..e25dee7d 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -672,3 +672,13 @@ func GetStunPort() string { } return port } + +func IsProxyEnabled() bool { + var enabled = true //default + if os.Getenv("PROXY") != "" { + enabled = os.Getenv("PROXY") == "on" + } else if config.Config.Server.Proxy != "" { + enabled = config.Config.Server.Proxy == "on" + } + return enabled +}