mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-06 01:07:41 +08:00
check mq is connected before checkin, rm unused functions
This commit is contained in:
@@ -192,37 +192,10 @@ func LeaveNetwork(network string) error {
|
|||||||
if err := removeHostDNS(cfg.Node.Interface, ncutils.IsWindows()); err != nil {
|
if err := removeHostDNS(cfg.Node.Interface, ncutils.IsWindows()); err != nil {
|
||||||
logger.Log(0, "failed to delete dns entries for", cfg.Node.Interface, err.Error())
|
logger.Log(0, "failed to delete dns entries for", cfg.Node.Interface, err.Error())
|
||||||
}
|
}
|
||||||
logger.Log(2, "deleting broker keys as required")
|
|
||||||
if !brokerInUse(cfg.Server.Server) {
|
|
||||||
if err := deleteBrokerFiles(cfg.Server.Server); err != nil {
|
|
||||||
logger.Log(0, "failed to deleter certs for", cfg.Server.Server, err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.Log(2, "restarting daemon")
|
logger.Log(2, "restarting daemon")
|
||||||
return daemon.Restart()
|
return daemon.Restart()
|
||||||
}
|
}
|
||||||
|
|
||||||
func brokerInUse(broker string) bool {
|
|
||||||
networks, _ := ncutils.GetSystemNetworks()
|
|
||||||
for _, net := range networks {
|
|
||||||
cfg := config.ClientConfig{}
|
|
||||||
cfg.Network = net
|
|
||||||
cfg.ReadConfig()
|
|
||||||
if cfg.Server.Server == broker {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func deleteBrokerFiles(broker string) error {
|
|
||||||
dir := ncutils.GetNetclientServerPath(broker)
|
|
||||||
if err := os.RemoveAll(dir); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func deleteNodeFromServer(cfg *config.ClientConfig) error {
|
func deleteNodeFromServer(cfg *config.ClientConfig) error {
|
||||||
node := cfg.Node
|
node := cfg.Node
|
||||||
if node.IsServer == "yes" {
|
if node.IsServer == "yes" {
|
||||||
@@ -340,6 +313,7 @@ func API(data any, method, url, authorization string) (*http.Response, error) {
|
|||||||
if authorization != "" {
|
if authorization != "" {
|
||||||
request.Header.Set("authorization", "Bearer "+authorization)
|
request.Header.Set("authorization", "Bearer "+authorization)
|
||||||
}
|
}
|
||||||
|
request.Header.Set("requestfrom", "node")
|
||||||
return HTTPClient.Do(request)
|
return HTTPClient.Do(request)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -2,8 +2,6 @@ package functions
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
|
||||||
"crypto/x509"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
@@ -68,14 +66,18 @@ func Daemon() error {
|
|||||||
cancel()
|
cancel()
|
||||||
logger.Log(0, "shutting down netclient daemon")
|
logger.Log(0, "shutting down netclient daemon")
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
if mqclient != nil {
|
||||||
mqclient.Disconnect(250)
|
mqclient.Disconnect(250)
|
||||||
|
}
|
||||||
logger.Log(0, "shutdown complete")
|
logger.Log(0, "shutdown complete")
|
||||||
return nil
|
return nil
|
||||||
case <-reset:
|
case <-reset:
|
||||||
logger.Log(0, "received reset")
|
logger.Log(0, "received reset")
|
||||||
cancel()
|
cancel()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
if mqclient != nil {
|
||||||
mqclient.Disconnect(250)
|
mqclient.Disconnect(250)
|
||||||
|
}
|
||||||
logger.Log(0, "restarting daemon")
|
logger.Log(0, "restarting daemon")
|
||||||
cancel = startGoRoutines(&wg)
|
cancel = startGoRoutines(&wg)
|
||||||
}
|
}
|
||||||
@@ -111,14 +113,7 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for {
|
|
||||||
if mqclient != nil && mqclient.IsConnected() {
|
|
||||||
go Checkin(ctx, wg)
|
go Checkin(ctx, wg)
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
return cancel
|
return cancel
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,34 +202,6 @@ func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientCon
|
|||||||
logger.Log(0, "shutting down message queue for server", cfg.Server.Server)
|
logger.Log(0, "shutting down message queue for server", cfg.Server.Server)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTLSConf sets up tls configuration to connect to broker securely
|
|
||||||
func NewTLSConfig(server string) (*tls.Config, error) {
|
|
||||||
file := ncutils.GetNetclientServerPath(server) + ncutils.GetSeparator() + "root.pem"
|
|
||||||
certpool := x509.NewCertPool()
|
|
||||||
ca, err := os.ReadFile(file)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log(0, "could not read CA file", err.Error())
|
|
||||||
}
|
|
||||||
ok := certpool.AppendCertsFromPEM(ca)
|
|
||||||
if !ok {
|
|
||||||
logger.Log(0, "failed to append cert")
|
|
||||||
}
|
|
||||||
clientKeyPair, err := tls.LoadX509KeyPair(ncutils.GetNetclientServerPath(server)+ncutils.GetSeparator()+"client.pem", ncutils.GetNetclientPath()+ncutils.GetSeparator()+"client.key")
|
|
||||||
if err != nil {
|
|
||||||
logger.Log(0, "could not read client cert/key", err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
certs := []tls.Certificate{clientKeyPair}
|
|
||||||
return &tls.Config{
|
|
||||||
RootCAs: certpool,
|
|
||||||
ClientAuth: tls.NoClientCert,
|
|
||||||
ClientCAs: nil,
|
|
||||||
Certificates: certs,
|
|
||||||
InsecureSkipVerify: false,
|
|
||||||
}, nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// func setMQTTSingenton creates a connection to broker for single use (ie to publish a message)
|
// func setMQTTSingenton creates a connection to broker for single use (ie to publish a message)
|
||||||
// only to be called from cli (eg. connect/disconnect, join, leave) and not from daemon ---
|
// only to be called from cli (eg. connect/disconnect, join, leave) and not from daemon ---
|
||||||
func setupMQTTSingleton(cfg *config.ClientConfig) error {
|
func setupMQTTSingleton(cfg *config.ClientConfig) error {
|
||||||
|
@@ -29,7 +29,6 @@ var metricsCache = new(sync.Map)
|
|||||||
func Checkin(ctx context.Context, wg *sync.WaitGroup) {
|
func Checkin(ctx context.Context, wg *sync.WaitGroup) {
|
||||||
logger.Log(2, "starting checkin goroutine")
|
logger.Log(2, "starting checkin goroutine")
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
checkin()
|
|
||||||
ticker := time.NewTicker(time.Minute * ncutils.CheckInInterval)
|
ticker := time.NewTicker(time.Minute * ncutils.CheckInInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
@@ -38,7 +37,12 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup) {
|
|||||||
logger.Log(0, "checkin routine closed")
|
logger.Log(0, "checkin routine closed")
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
if mqclient != nil && mqclient.IsConnected() {
|
||||||
checkin()
|
checkin()
|
||||||
|
} else {
|
||||||
|
logger.Log(0, "MQ client is not connected, skipping checkin...")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user